diff --git a/DB.py b/DB.py index 8f8976d..6e3eaa3 100644 --- a/DB.py +++ b/DB.py @@ -771,6 +771,7 @@ class DBSA: logger.debug(f"flush 操作记录数: {len(op_rows)}") logger.debug(f"flush 视频记录数: {len(vid_rows)}") + # 作者表处理 now_ts = int(time.time()) authors_map = {} for data in payloads: @@ -809,19 +810,34 @@ class DBSA: cls._execute_with_deadlock_retry(ondup) except Exception as e: logger.error(f"写作者表失败: {e}") - cls.push_record_many(payloads) + try: + cls.push_record_many(payloads) + except Exception as re: + logger.error("[Redis 回退失败]", re) return + # 操作记录 UPSERT(解决唯一索引冲突) try: - stmt = video_op.insert().values(op_rows) + stmt = mysql_insert(video_op).values(op_rows) + ondup = stmt.on_duplicate_key_update( + updatetime=stmt.inserted.updatetime, + operatetime=stmt.inserted.operatetime, + ts_status=stmt.inserted.ts_status, + is_repeat=stmt.inserted.is_repeat, + history_status=stmt.inserted.history_status, + ) with _engine.begin() as conn: - conn.execute(stmt) - logger.info(f"插入操作记录: {len(op_rows)} 条") + conn.execute(ondup) + logger.info(f"插入/更新操作记录: {len(op_rows)} 条") except Exception as e: - logger.error(f"插入操作记录失败: {e}") - cls.push_record_many(payloads) + logger.error(f"操作记录 UPSERT 失败: {e}") + try: + cls.push_record_many(payloads) + except Exception as re: + logger.error("[Redis 回退失败]", re) return + # 视频表 UPSERT try: stmt = mysql_insert(video).values(vid_rows) upd = { @@ -833,14 +849,20 @@ class DBSA: "watch_number": stmt.inserted.watch_number, "follow_number": stmt.inserted.follow_number, "video_number": stmt.inserted.video_number, + "is_repeat": stmt.inserted.is_repeat, + "ts_status": stmt.inserted.ts_status, } ondup = stmt.on_duplicate_key_update(**upd) with _engine.begin() as conn: conn.execute(ondup) - logger.info(f"[DBSA] flush 完成:authors={len(author_rows)} 条, op={len(op_rows)} 条, video={len(vid_rows)} 条") + logger.info( + f"[DBSA] flush 完成:authors={len(author_rows)} 条, op={len(op_rows)} 条, video={len(vid_rows)} 条") except Exception as e: logger.error(f"写视频表失败: {e}") - cls.push_record_many(payloads) + try: + cls.push_record_many(payloads) + except Exception as re: + logger.error("[Redis 回退失败]", re) @classmethod def _execute_with_deadlock_retry(cls, statement):