fix: 添加重复视频标识并优化死锁重试机制

This commit is contained in:
晓丰 2025-06-03 09:29:00 +08:00
parent ff612f09fd
commit dac24f1400

44
DB.py
View File

@ -10,6 +10,8 @@ from sqlalchemy import (
BigInteger, Integer, String, Text, DateTime, tuple_ BigInteger, Integer, String, Text, DateTime, tuple_
) )
from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import OperationalError
from logger import logger from logger import logger
from datetime import datetime from datetime import datetime
@ -66,6 +68,7 @@ video = Table("sh_dm_video_v2", _meta,
Column("watch_number", Integer), Column("watch_number", Integer),
Column("follow_number", Integer), Column("follow_number", Integer),
Column("video_number", Integer), Column("video_number", Integer),
Column("is_repeat", Integer),
) )
video_author = Table( video_author = Table(
"sh_dm_video_author", "sh_dm_video_author",
@ -604,7 +607,6 @@ class DBSA:
existing_keys = set() existing_keys = set()
if vid_rows: if vid_rows:
# 收集 (v_xid, rn) 对,应与 video 表中的唯一索引匹配
all_keys = list({(row["v_xid"], row["rn"]) for row in vid_rows}) all_keys = list({(row["v_xid"], row["rn"]) for row in vid_rows})
conn = _engine.connect() conn = _engine.connect()
try: try:
@ -624,12 +626,15 @@ class DBSA:
finally: finally:
conn.close() conn.close()
# 先给日志表的 op_rows 设置 0/11=重复0=不重复
for i, vid_row in enumerate(vid_rows): for i, vid_row in enumerate(vid_rows):
key = (vid_row["v_xid"], vid_row["rn"]) key = (vid_row["v_xid"], vid_row["rn"])
if key in existing_keys: op_rows[i]["is_repeat"] = 1 if key in existing_keys else 0
op_rows[i]["is_repeat"] = 1
else: # 再把同样的 is_repeat 值写到 vid_rows以便视频表也能存到 0/1
op_rows[i]["is_repeat"] = 2 for i, vid_row in enumerate(vid_rows):
vid_row["is_repeat"] = op_rows[i]["is_repeat"]
vid_row.pop("level", None)
# 以下作者表、日志表和视频表写入逻辑保持不变... # 以下作者表、日志表和视频表写入逻辑保持不变...
authors_map = {} authors_map = {}
@ -653,6 +658,7 @@ class DBSA:
author_rows = list(authors_map.values()) author_rows = list(authors_map.values())
if author_rows: if author_rows:
author_rows.sort(key=lambda x: x["u_xid"])
stmt_author = mysql_insert(video_author).values(author_rows) stmt_author = mysql_insert(video_author).values(author_rows)
upd_author = { upd_author = {
"u_name": stmt_author.inserted.u_name, "u_name": stmt_author.inserted.u_name,
@ -665,10 +671,9 @@ class DBSA:
} }
ondup_author = stmt_author.on_duplicate_key_update(**upd_author) ondup_author = stmt_author.on_duplicate_key_update(**upd_author)
try: try:
with _engine.begin() as conn2: cls._execute_with_deadlock_retry(ondup_author)
conn2.execute(ondup_author)
except Exception as e: except Exception as e:
logger.info(f"[DBSA] 写作者表失败: {e}") logger.info(f"[DBSA] 写作者表失败(死锁重试后仍未成功): {e}")
try: try:
cls.push_record_many(payloads) cls.push_record_many(payloads)
except Exception as re: except Exception as re:
@ -686,10 +691,6 @@ class DBSA:
logger.info("[Redis 回退失败]", re) logger.info("[Redis 回退失败]", re)
return return
for vid_row in vid_rows:
vid_row.pop("is_repeat", None)
vid_row.pop("level", None)
if vid_rows: if vid_rows:
try: try:
cls._bulk_upsert(vid_rows) cls._bulk_upsert(vid_rows)
@ -737,3 +738,22 @@ class DBSA:
daemon=True daemon=True
) )
thread.start() thread.start()
@classmethod
def _execute_with_deadlock_retry(cls, statement):
for attempt in range(3):
try:
with _engine.begin() as conn:
conn.execute(statement)
return
except OperationalError as e:
# e.orig.args[0] == 1213 表示死锁
code = None
if hasattr(e.orig, "args") and len(e.orig.args) >= 1:
code = e.orig.args[0]
if code == 1213 and attempt < 3 - 1:
time.sleep(1)
continue
# 不是死锁,或者已经重试到达上限,直接抛出
raise