feat: 实现原子操作以清空并写入report_queue数据

This commit is contained in:
晓丰 2025-06-30 21:52:42 +08:00
parent 521e3770b0
commit 3e5ec774ca

329
DB.py
View File

@ -675,15 +675,13 @@ class DBVidcon:
class DBSA: class DBSA:
FLUSH_EVERY_ROWS = 100 # 行阈值 FLUSH_EVERY_ROWS = 100 # 行阈值
FLUSH_INTERVAL = 30 # 秒阈值 FLUSH_INTERVAL = 30 # 秒阈值
_buf_op: list = [] _buf_op: list = []
_buf_vid: list = [] _buf_vid: list = []
_buf_payload: list = [] _buf_payload: list = []
_last_flush: float = time.time() _last_flush: float = time.time()
_lock = threading.Lock() _lock = threading.Lock()
_existing_op_keys = set() # 用于操作记录表的索引键
_existing_vid_keys = set() # 用于视频表的唯一索引键
@staticmethod @staticmethod
def push_record_many(rows): def push_record_many(rows):
@ -693,7 +691,6 @@ class DBSA:
@classmethod @classmethod
def upsert_video(cls, data): def upsert_video(cls, data):
data = copy.deepcopy(data) data = copy.deepcopy(data)
# 设置默认值
data.setdefault("a_id", 0) data.setdefault("a_id", 0)
data.setdefault("history_status", "") data.setdefault("history_status", "")
data.setdefault("is_repeat", 3) data.setdefault("is_repeat", 3)
@ -702,89 +699,62 @@ class DBSA:
now_ts = int(time.time()) now_ts = int(time.time())
# 生成操作记录表的唯一索引键 op_row = {
op_index_key = ( "v_id": data["v_id"],
data["rn"] or "", "v_xid": data["v_xid"],
data["v_name"] or "", "a_id": data["a_id"],
data["keyword"] or "", "level": data["level"],
data["v_xid"] or "", "name_title": data["v_name"],
now_ts "keyword": data["keyword"],
) "rn": data["rn"],
"history_status": data["history_status"],
"is_repeat": 3,
"sort": data["sort"],
"createtime": now_ts,
"updatetime": now_ts,
"operatetime": now_ts,
"batch": data["batch"],
"machine": data.get("machine_id", 0),
"is_piracy": data.get("is_piracy", '3'),
"ts_status": data.get("ts_status", 1),
}
# 生成视频表的唯一索引键 vid_row = {
vid_index_key = ( "v_id": data["v_id"],
data["v_xid"] or "", "v_xid": data["v_xid"],
data["v_name"] or "" "v_name": data["v_name"],
) "title": data["title"],
"link": data["link"],
"edition": "",
"duration": str(data["duration"]) if data.get("duration") else '0',
"public_time": data["create_time"],
"cover_pic": data["cover_pic"],
"sort": data["sort"],
"u_xid": data["u_xid"],
"u_id": data["u_id"],
"u_pic": data["u_pic"],
"u_name": data["u_name"],
"status": 1,
"createtime": now_ts,
"updatetime": now_ts,
"operatetime": now_ts,
"watch_number": data.get("view", 0),
"follow_number": data.get("fans", 0),
"video_number": data.get("videos", 0),
"is_repeat": 3,
"is_piracy": data.get("is_piracy", 3),
"ts_status": data.get("ts_status", 1),
}
with cls._lock: with cls._lock:
# 检查两个索引键是否已存在
if op_index_key in cls._existing_op_keys:
logger.debug(f"跳过重复操作记录: {op_index_key}")
return
if vid_index_key in cls._existing_vid_keys:
logger.debug(f"跳过重复视频记录: {vid_index_key}")
return
# 生成操作记录行
op_row = {
"v_id": data["v_id"],
"v_xid": data["v_xid"],
"a_id": data["a_id"],
"level": data["level"],
"name_title": data["v_name"],
"keyword": data["keyword"],
"rn": data["rn"],
"history_status": data["history_status"],
"is_repeat": data["is_repeat"],
"sort": data["sort"],
"createtime": now_ts,
"updatetime": now_ts,
"operatetime": now_ts,
"batch": data["batch"],
"machine": data.get("machine_id", 0),
"is_piracy": data.get("is_piracy", '3'),
"ts_status": data.get("ts_status", 1),
}
# 视频行生成
vid_row = {
"v_id": data["v_id"],
"v_xid": data["v_xid"],
"v_name": data["v_name"],
"title": data["title"],
"link": data["link"],
"edition": "",
"duration": str(data["duration"]) if data.get("duration") else '0',
"public_time": data["create_time"],
"cover_pic": data["cover_pic"],
"sort": data["sort"],
"u_xid": data["u_xid"],
"u_id": data["u_id"],
"u_pic": data["u_pic"],
"u_name": data["u_name"],
"status": 1,
"createtime": now_ts,
"updatetime": now_ts,
"operatetime": now_ts,
"watch_number": data.get("view", 0),
"follow_number": data.get("fans", 0),
"video_number": data.get("videos", 0),
"is_repeat": 0, # 初始设为0flush时更新
"is_piracy": data.get("is_piracy", 3),
"ts_status": data.get("ts_status", 1),
}
cls._buf_op.append(op_row) cls._buf_op.append(op_row)
cls._buf_vid.append(vid_row) cls._buf_vid.append(vid_row)
cls._buf_payload.append(data) cls._buf_payload.append(data)
cls._existing_op_keys.add(op_index_key)
cls._existing_vid_keys.add(vid_index_key)
buf_len = len(cls._buf_vid)
buf_len = len(cls._buf_vid)
need_flush = False need_flush = False
flush_reason = "" flush_reason = ""
if buf_len >= cls.FLUSH_EVERY_ROWS: if buf_len >= cls.FLUSH_EVERY_ROWS:
need_flush = True need_flush = True
flush_reason = "ROWS" flush_reason = "ROWS"
@ -806,105 +776,17 @@ class DBSA:
cls._buf_vid.clear() cls._buf_vid.clear()
cls._buf_payload.clear() cls._buf_payload.clear()
cls._last_flush = time.time() cls._last_flush = time.time()
existing_op_keys_batch = cls._existing_op_keys.copy()
existing_vid_keys_batch = cls._existing_vid_keys.copy()
cls._existing_op_keys.clear()
cls._existing_vid_keys.clear()
if not op_rows and not vid_rows: if not op_rows and not vid_rows:
return return
# ========== 视频表重复检测 ========== # 所有 is_repeat 统一为 3
vid_existing_keys = set()
if vid_rows:
# 生成所有视频唯一键 (v_xid, v_name)
all_vid_keys = list({(row["v_xid"], row["v_name"]) for row in vid_rows})
if all_vid_keys: # 仅当有键时才查询
conn = _engine.connect()
try:
# 使用SQLAlchemy 1.4+的select语法
sel_vid = select(video.c.v_xid, video.c.v_name).where(
tuple_(video.c.v_xid, video.c.v_name).in_(all_vid_keys)
)
result = conn.execute(sel_vid)
vid_existing_keys = {(row.v_xid, row.v_name) for row in result}
except Exception as e:
logger.error(f"查询视频表时异常: {e}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.error("[Redis 回退失败]", re)
return
finally:
conn.close()
# 设置重复标志
for i, vid_row in enumerate(vid_rows): for i, vid_row in enumerate(vid_rows):
key = (vid_row["v_xid"], vid_row["v_name"]) vid_row["is_repeat"] = 3
vid_row["is_repeat"] = 1 if key in vid_existing_keys else 0
# 同时设置操作记录的重复标志
if i < len(op_rows): if i < len(op_rows):
op_rows[i]["is_repeat"] = vid_row["is_repeat"] op_rows[i]["is_repeat"] = 3
# ========== 操作记录表去重 ========== # 处理作者表
final_op_rows = []
if op_rows:
# 生成所有操作记录索引键
op_check_keys = [
(row["rn"], row["name_title"], row["keyword"], row["v_xid"], row["createtime"])
for row in op_rows
]
if op_check_keys: # 仅当有键时才查询
try:
# 查询数据库已存在的操作记录
conn = _engine.connect()
# 使用SQLAlchemy 1.4+的select语法
stmt = select(
video_op.c.rn,
video_op.c.name_title,
video_op.c.keyword,
video_op.c.v_xid,
video_op.c.createtime
).where(
tuple_(
video_op.c.rn,
video_op.c.name_title,
video_op.c.keyword,
video_op.c.v_xid,
video_op.c.createtime
).in_(op_check_keys)
)
result = conn.execute(stmt)
existing_db_op_keys = {tuple(row) for row in result}
conn.close()
except Exception as e:
logger.error(f"查询操作记录表失败: {e}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.error("Redis退回失败", re)
return
# 过滤重复操作记录
for idx, row in enumerate(op_rows):
index_key = (
row["rn"],
row["name_title"],
row["keyword"],
row["v_xid"],
row["createtime"]
)
# 检查是否在数据库或当前批次中重复
if index_key not in existing_db_op_keys and index_key not in existing_op_keys_batch:
final_op_rows.append(row)
existing_op_keys_batch.add(index_key)
else:
final_op_rows = op_rows[:] # 如果没有键,直接使用所有行
# ========== 作者表处理 ==========
authors_map = {} authors_map = {}
now_ts = int(time.time()) now_ts = int(time.time())
for data in payloads: for data in payloads:
@ -948,62 +830,56 @@ class DBSA:
logger.error("[Redis 回退失败]", re) logger.error("[Redis 回退失败]", re)
return return
# ========== 操作记录表写入 ========== # 插入操作记录表
if final_op_rows: try:
op_rows.sort(key=lambda x: (
x['rn'] or '',
x['name_title'] or '',
x['keyword'] or '',
x['v_xid'] or '',
x['createtime']
))
stmt = video_op.insert().values(op_rows)
with _engine.begin() as conn:
conn.execute(stmt)
logger.info(f"插入操作记录: {len(op_rows)}")
except Exception as e:
logger.error(f"插入操作记录失败: {e}")
try: try:
# 按索引字段排序优化插入性能 cls.push_record_many(payloads)
final_op_rows.sort(key=lambda x: ( except Exception as re:
x['rn'] or '', logger.error("[Redis 回退失败]", re)
x['name_title'] or '', return
x['keyword'] or '',
x['v_xid'] or '',
x['createtime']
))
stmt = video_op.insert().values(final_op_rows) # 插入/更新视频表
with _engine.begin() as conn: try:
conn.execute(stmt) stmt = mysql_insert(video).values(vid_rows)
logger.info(f"插入操作记录: {len(final_op_rows)}") upd = {
except Exception as e: "title": stmt.inserted.title,
logger.error(f"插入操作记录失败: {e}") "duration": stmt.inserted.duration,
try: "cover_pic": stmt.inserted.cover_pic,
cls.push_record_many(payloads) "sort": stmt.inserted.sort,
except Exception as re: "updatetime": stmt.inserted.updatetime,
logger.error("[Redis 回退失败]", re) "watch_number": stmt.inserted.watch_number,
return "follow_number": stmt.inserted.follow_number,
"video_number": stmt.inserted.video_number,
# ========== 视频表写入 ========== "is_repeat": stmt.inserted.is_repeat,
if vid_rows: "is_piracy": stmt.inserted.is_piracy,
"ts_status": stmt.inserted.ts_status,
}
ondup = stmt.on_duplicate_key_update(**upd)
with _engine.begin() as conn:
conn.execute(ondup)
logger.info(
f"[DBSA] flush 完成authors={len(author_rows)} 条, "
f"op={len(op_rows)} 条, video={len(vid_rows)}"
)
except Exception as e:
logger.error(f"写视频表失败: {e}")
try: try:
# 使用新的唯一键 (v_xid, v_name) 进行 upsert cls.push_record_many(payloads)
stmt = mysql_insert(video).values(vid_rows) except Exception as re:
upd = { logger.error("[Redis 回退失败]", re)
"title": stmt.inserted.title,
"duration": stmt.inserted.duration,
"cover_pic": stmt.inserted.cover_pic,
"sort": stmt.inserted.sort,
"updatetime": stmt.inserted.updatetime,
"watch_number": stmt.inserted.watch_number,
"follow_number": stmt.inserted.follow_number,
"video_number": stmt.inserted.video_number,
"is_repeat": stmt.inserted.is_repeat,
"is_piracy": stmt.inserted.is_piracy,
"ts_status": stmt.inserted.ts_status,
}
ondup = stmt.on_duplicate_key_update(**upd)
with _engine.begin() as conn:
conn.execute(ondup)
logger.info(
f"[DBSA] flush 完成authors={len(author_rows)} 条, "
f"op={len(final_op_rows)} 条, video={len(vid_rows)}"
)
except Exception as e:
logger.error(f"写视频表失败: {e}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.error("[Redis 回退失败]", re)
@classmethod @classmethod
def update_video_stats(cls, locator: dict, stats: dict) -> int: def update_video_stats(cls, locator: dict, stats: dict) -> int:
@ -1014,7 +890,6 @@ class DBSA:
params = dict(stats) params = dict(stats)
params["updatetime"] = int(time.time()) params["updatetime"] = int(time.time())
# 过滤只保留 video 表中存在的列
valid_cols = set(video.c.keys()) valid_cols = set(video.c.keys())
filtered_params = {k: v for k, v in params.items() if k in valid_cols} filtered_params = {k: v for k, v in params.items() if k in valid_cols}
@ -1030,9 +905,6 @@ class DBSA:
@classmethod @classmethod
def update_video_stats_async(cls, locator: dict, stats: dict) -> None: def update_video_stats_async(cls, locator: dict, stats: dict) -> None:
"""
异步更新 sh_dm_video_v3 表中的统计字段立即返回不阻塞调用线程
"""
thread = threading.Thread( thread = threading.Thread(
target=cls.update_video_stats, target=cls.update_video_stats,
args=(locator, stats), args=(locator, stats),
@ -1048,9 +920,8 @@ class DBSA:
conn.execute(statement) conn.execute(statement)
return return
except OperationalError as e: except OperationalError as e:
# 检查是否为死锁错误
code = getattr(e.orig, "args", [None])[0] code = getattr(e.orig, "args", [None])[0]
if code == 1213 and attempt < 2: # 1213 是MySQL死锁错误码 if code == 1213 and attempt < 2:
time.sleep(0.5 * (attempt + 1)) time.sleep(0.5 * (attempt + 1))
continue continue
raise raise