feat: 优化DB.py中的操作记录和视频表的UPSERT逻辑,添加错误处理和日志记录

This commit is contained in:
晓丰 2025-07-01 19:01:51 +08:00
parent 5a02e98438
commit 125eb12a8f

38
DB.py
View File

@ -771,6 +771,7 @@ class DBSA:
logger.debug(f"flush 操作记录数: {len(op_rows)}") logger.debug(f"flush 操作记录数: {len(op_rows)}")
logger.debug(f"flush 视频记录数: {len(vid_rows)}") logger.debug(f"flush 视频记录数: {len(vid_rows)}")
# 作者表处理
now_ts = int(time.time()) now_ts = int(time.time())
authors_map = {} authors_map = {}
for data in payloads: for data in payloads:
@ -809,19 +810,34 @@ class DBSA:
cls._execute_with_deadlock_retry(ondup) cls._execute_with_deadlock_retry(ondup)
except Exception as e: except Exception as e:
logger.error(f"写作者表失败: {e}") logger.error(f"写作者表失败: {e}")
cls.push_record_many(payloads) try:
cls.push_record_many(payloads)
except Exception as re:
logger.error("[Redis 回退失败]", re)
return return
# 操作记录 UPSERT解决唯一索引冲突
try: try:
stmt = video_op.insert().values(op_rows) stmt = mysql_insert(video_op).values(op_rows)
ondup = stmt.on_duplicate_key_update(
updatetime=stmt.inserted.updatetime,
operatetime=stmt.inserted.operatetime,
ts_status=stmt.inserted.ts_status,
is_repeat=stmt.inserted.is_repeat,
history_status=stmt.inserted.history_status,
)
with _engine.begin() as conn: with _engine.begin() as conn:
conn.execute(stmt) conn.execute(ondup)
logger.info(f"插入操作记录: {len(op_rows)}") logger.info(f"插入/更新操作记录: {len(op_rows)}")
except Exception as e: except Exception as e:
logger.error(f"插入操作记录失败: {e}") logger.error(f"操作记录 UPSERT 失败: {e}")
cls.push_record_many(payloads) try:
cls.push_record_many(payloads)
except Exception as re:
logger.error("[Redis 回退失败]", re)
return return
# 视频表 UPSERT
try: try:
stmt = mysql_insert(video).values(vid_rows) stmt = mysql_insert(video).values(vid_rows)
upd = { upd = {
@ -833,14 +849,20 @@ class DBSA:
"watch_number": stmt.inserted.watch_number, "watch_number": stmt.inserted.watch_number,
"follow_number": stmt.inserted.follow_number, "follow_number": stmt.inserted.follow_number,
"video_number": stmt.inserted.video_number, "video_number": stmt.inserted.video_number,
"is_repeat": stmt.inserted.is_repeat,
"ts_status": stmt.inserted.ts_status,
} }
ondup = stmt.on_duplicate_key_update(**upd) ondup = stmt.on_duplicate_key_update(**upd)
with _engine.begin() as conn: with _engine.begin() as conn:
conn.execute(ondup) conn.execute(ondup)
logger.info(f"[DBSA] flush 完成authors={len(author_rows)} 条, op={len(op_rows)} 条, video={len(vid_rows)}") logger.info(
f"[DBSA] flush 完成authors={len(author_rows)} 条, op={len(op_rows)} 条, video={len(vid_rows)}")
except Exception as e: except Exception as e:
logger.error(f"写视频表失败: {e}") logger.error(f"写视频表失败: {e}")
cls.push_record_many(payloads) try:
cls.push_record_many(payloads)
except Exception as re:
logger.error("[Redis 回退失败]", re)
@classmethod @classmethod
def _execute_with_deadlock_retry(cls, statement): def _execute_with_deadlock_retry(cls, statement):