feat: 重构数据库操作以改进错误处理和重试逻辑

This commit is contained in:
晓丰 2025-05-17 00:57:52 +08:00
parent dd90cc3c91
commit bbb2cf1823

139
DB.py
View File

@ -23,7 +23,7 @@ class DBVidcon:
def __init__(self):
self.list_key = "video_kw_queue"
self.record_list_key = "record_kw_queue"
self.record_list_key = "error_save_queue"
self.urgent_list_key = "video_urgent_queue"
self.conn = pymysql.connect(**self._MYSQL_CONF)
self.cursor = self.conn.cursor()
@ -110,7 +110,6 @@ class DBVidcon:
self.redis.lpush(self.record_list_key, *raws)
def rollback_urgent(self, raws):
"""写库失败或加急任务处理失败时,把原始 JSON 退回 urgent 列表"""
if isinstance(raws, str):
raws = [raws]
try:
@ -151,67 +150,87 @@ class DBVidcon:
data.setdefault("is_repeat", 3)
data["sort"] = data.get("index", 0)
try:
select_repeat = """
SELECT is_repeat
FROM sh_dm_video_v2
WHERE rn = %(rn)s
AND v_xid = %(v_xid)s
LIMIT 1
"""
self.cursor.execute(select_repeat, data)
row = self.cursor.fetchone()
if row:
data['is_repeat'] = row[0]
max_retries = 1 # 除了第一次外,再重试一次
attempt = 0
# 2. 插入到 op 表
sql_op = """
INSERT INTO sh_dm_video_op_v2 (
v_id, v_xid, a_id, level, name_title,
keyword, rn, history_status, is_repeat,
sort, createtime, updatetime, batch, machine
) VALUES (
%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s,
%(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s,
%(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s
)
"""
self.cursor.execute(sql_op, data)
while True:
try:
# 1) 先读 is_repeat
select_repeat = """
SELECT is_repeat
FROM sh_dm_video_v2
WHERE rn = %(rn)s
AND v_xid = %(v_xid)s
LIMIT 1
"""
self.cursor.execute(select_repeat, data)
row = self.cursor.fetchone()
if row:
data['is_repeat'] = row[0]
# 2) 插入到 op 表
sql_op = """
INSERT INTO sh_dm_video_op_v2 (
v_id, v_xid, a_id, level, name_title,
keyword, rn, history_status, is_repeat,
sort, createtime, updatetime, batch, machine
) VALUES (
%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s,
%(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s,
%(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s
)
"""
self.cursor.execute(sql_op, data)
# 3) 删除旧表行
sql_del = """
DELETE FROM sh_dm_video_v2
WHERE rn = %(rn)s
AND v_xid = %(v_xid)s
"""
self.cursor.execute(sql_del, data)
# 4) 插入新表
sql_ins = """
INSERT INTO sh_dm_video_v2 (
v_id, v_xid, rn, v_name, title, link,
is_piracy, edition, duration,
watch_number, follow_number, video_number,
public_time, cover_pic, sort,
u_xid, u_id, u_pic, u_name,
status, createtime, updatetime,
is_repeat
) VALUES (
%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s,
%(is_piracy)s, '', %(duration)s,
%(watch_number)s, %(fans)s, %(videos)s,
%(create_time)s, %(cover_pic)s, %(sort)s,
%(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s,
1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(),
%(is_repeat)s
)
"""
self.cursor.execute(sql_ins, data)
break # 成功跳出重试循环
except Exception as e:
# 回滚这次未提交的改动
self.conn.rollback()
print("[数据库写入异常]", str(e))
print("[出错数据]:", data)
if attempt < max_retries:
attempt += 1
print(f"{attempt + 1} 次重试…")
continue
else:
# 重试过后依然失败,推入 Redis 备用
print("重试失败,将数据写入 Redis 以便后续处理")
self.push_record(data)
print("[交由Redis处理]")
break
# 3. 删除旧表中的那行
sql_del = """
DELETE FROM sh_dm_video_v2
WHERE rn = %(rn)s
AND v_xid = %(v_xid)s
"""
self.cursor.execute(sql_del, data)
# 4. 带上 is_repeat 再插入新数据
sql_ins = """
INSERT INTO sh_dm_video_v2 (
v_id, v_xid, rn, v_name, title, link,
is_piracy, edition, duration,
watch_number, follow_number, video_number,
public_time, cover_pic, sort,
u_xid, u_id, u_pic, u_name,
status, createtime, updatetime,
is_repeat
) VALUES (
%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s,
%(is_piracy)s, '', %(duration)s,
%(watch_number)s, %(fans)s, %(videos)s,
%(create_time)s, %(cover_pic)s, %(sort)s,
%(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s,
1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(),
%(is_repeat)s
)
"""
self.cursor.execute(sql_ins, data)
except Exception as e:
# 打印错误并回滚
print("[数据库写入异常]", str(e))
print("[出错数据]", data)
raise
def flush(self):
"""批量执行完后手动提交。"""