From dac24f140039f48ab4dc9f04ca6ded9718cf3e54 Mon Sep 17 00:00:00 2001 From: Franklin-F Date: Tue, 3 Jun 2025 09:29:00 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=B7=BB=E5=8A=A0=E9=87=8D=E5=A4=8D?= =?UTF-8?q?=E8=A7=86=E9=A2=91=E6=A0=87=E8=AF=86=E5=B9=B6=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=AD=BB=E9=94=81=E9=87=8D=E8=AF=95=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DB.py | 44 ++++++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/DB.py b/DB.py index be2a6ee..c456181 100644 --- a/DB.py +++ b/DB.py @@ -10,6 +10,8 @@ from sqlalchemy import ( BigInteger, Integer, String, Text, DateTime, tuple_ ) from sqlalchemy.dialects.mysql import insert as mysql_insert +from sqlalchemy.exc import OperationalError + from logger import logger from datetime import datetime @@ -66,6 +68,7 @@ video = Table("sh_dm_video_v2", _meta, Column("watch_number", Integer), Column("follow_number", Integer), Column("video_number", Integer), + Column("is_repeat", Integer), ) video_author = Table( "sh_dm_video_author", @@ -604,7 +607,6 @@ class DBSA: existing_keys = set() if vid_rows: - # 收集 (v_xid, rn) 对,应与 video 表中的唯一索引匹配 all_keys = list({(row["v_xid"], row["rn"]) for row in vid_rows}) conn = _engine.connect() try: @@ -624,12 +626,15 @@ class DBSA: finally: conn.close() + # 先给日志表的 op_rows 设置 0/1:1=重复,0=不重复 for i, vid_row in enumerate(vid_rows): key = (vid_row["v_xid"], vid_row["rn"]) - if key in existing_keys: - op_rows[i]["is_repeat"] = 1 - else: - op_rows[i]["is_repeat"] = 2 + op_rows[i]["is_repeat"] = 1 if key in existing_keys else 0 + + # 再把同样的 is_repeat 值写到 vid_rows,以便视频表也能存到 0/1 + for i, vid_row in enumerate(vid_rows): + vid_row["is_repeat"] = op_rows[i]["is_repeat"] + vid_row.pop("level", None) # 以下作者表、日志表和视频表写入逻辑保持不变... authors_map = {} @@ -653,6 +658,7 @@ class DBSA: author_rows = list(authors_map.values()) if author_rows: + author_rows.sort(key=lambda x: x["u_xid"]) stmt_author = mysql_insert(video_author).values(author_rows) upd_author = { "u_name": stmt_author.inserted.u_name, @@ -665,10 +671,9 @@ class DBSA: } ondup_author = stmt_author.on_duplicate_key_update(**upd_author) try: - with _engine.begin() as conn2: - conn2.execute(ondup_author) + cls._execute_with_deadlock_retry(ondup_author) except Exception as e: - logger.info(f"[DBSA] 写作者表失败: {e}") + logger.info(f"[DBSA] 写作者表失败(死锁重试后仍未成功): {e}") try: cls.push_record_many(payloads) except Exception as re: @@ -686,10 +691,6 @@ class DBSA: logger.info("[Redis 回退失败]", re) return - for vid_row in vid_rows: - vid_row.pop("is_repeat", None) - vid_row.pop("level", None) - if vid_rows: try: cls._bulk_upsert(vid_rows) @@ -737,3 +738,22 @@ class DBSA: daemon=True ) thread.start() + + @classmethod + def _execute_with_deadlock_retry(cls, statement): + for attempt in range(3): + try: + + with _engine.begin() as conn: + conn.execute(statement) + return + except OperationalError as e: + # e.orig.args[0] == 1213 表示死锁 + code = None + if hasattr(e.orig, "args") and len(e.orig.args) >= 1: + code = e.orig.args[0] + if code == 1213 and attempt < 3 - 1: + time.sleep(1) + continue + # 不是死锁,或者已经重试到达上限,直接抛出 + raise