From bbb2cf18237edaeaf19d50a6e70d84a4fca46669 Mon Sep 17 00:00:00 2001 From: Franklin-F Date: Sat, 17 May 2025 00:57:52 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=87=8D=E6=9E=84=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E6=93=8D=E4=BD=9C=E4=BB=A5=E6=94=B9=E8=BF=9B=E9=94=99?= =?UTF-8?q?=E8=AF=AF=E5=A4=84=E7=90=86=E5=92=8C=E9=87=8D=E8=AF=95=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DB.py | 139 +++++++++++++++++++++++++++++++++------------------------- 1 file changed, 79 insertions(+), 60 deletions(-) diff --git a/DB.py b/DB.py index d31a496..98ee0d1 100644 --- a/DB.py +++ b/DB.py @@ -23,7 +23,7 @@ class DBVidcon: def __init__(self): self.list_key = "video_kw_queue" - self.record_list_key = "record_kw_queue" + self.record_list_key = "error_save_queue" self.urgent_list_key = "video_urgent_queue" self.conn = pymysql.connect(**self._MYSQL_CONF) self.cursor = self.conn.cursor() @@ -110,7 +110,6 @@ class DBVidcon: self.redis.lpush(self.record_list_key, *raws) def rollback_urgent(self, raws): - """写库失败或加急任务处理失败时,把原始 JSON 退回 urgent 列表""" if isinstance(raws, str): raws = [raws] try: @@ -151,67 +150,87 @@ class DBVidcon: data.setdefault("is_repeat", 3) data["sort"] = data.get("index", 0) - try: - select_repeat = """ - SELECT is_repeat - FROM sh_dm_video_v2 - WHERE rn = %(rn)s - AND v_xid = %(v_xid)s - LIMIT 1 - """ - self.cursor.execute(select_repeat, data) - row = self.cursor.fetchone() - if row: - data['is_repeat'] = row[0] + max_retries = 1 # 除了第一次外,再重试一次 + attempt = 0 - # 2. 插入到 op 表 - sql_op = """ - INSERT INTO sh_dm_video_op_v2 ( - v_id, v_xid, a_id, level, name_title, - keyword, rn, history_status, is_repeat, - sort, createtime, updatetime, batch, machine - ) VALUES ( - %(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, - %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, - %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s - ) - """ - self.cursor.execute(sql_op, data) + while True: + try: + # 1) 先读 is_repeat + select_repeat = """ + SELECT is_repeat + FROM sh_dm_video_v2 + WHERE rn = %(rn)s + AND v_xid = %(v_xid)s + LIMIT 1 + """ + self.cursor.execute(select_repeat, data) + row = self.cursor.fetchone() + if row: + data['is_repeat'] = row[0] + + # 2) 插入到 op 表 + sql_op = """ + INSERT INTO sh_dm_video_op_v2 ( + v_id, v_xid, a_id, level, name_title, + keyword, rn, history_status, is_repeat, + sort, createtime, updatetime, batch, machine + ) VALUES ( + %(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, + %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, + %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s + ) + """ + self.cursor.execute(sql_op, data) + + # 3) 删除旧表行 + sql_del = """ + DELETE FROM sh_dm_video_v2 + WHERE rn = %(rn)s + AND v_xid = %(v_xid)s + """ + self.cursor.execute(sql_del, data) + + # 4) 插入新表 + sql_ins = """ + INSERT INTO sh_dm_video_v2 ( + v_id, v_xid, rn, v_name, title, link, + is_piracy, edition, duration, + watch_number, follow_number, video_number, + public_time, cover_pic, sort, + u_xid, u_id, u_pic, u_name, + status, createtime, updatetime, + is_repeat + ) VALUES ( + %(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, + %(is_piracy)s, '', %(duration)s, + %(watch_number)s, %(fans)s, %(videos)s, + %(create_time)s, %(cover_pic)s, %(sort)s, + %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, + 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), + %(is_repeat)s + ) + """ + self.cursor.execute(sql_ins, data) + break # 成功跳出重试循环 + + except Exception as e: + # 回滚这次未提交的改动 + self.conn.rollback() + print("[数据库写入异常]", str(e)) + print("[出错数据]:", data) + + if attempt < max_retries: + attempt += 1 + print(f"第 {attempt + 1} 次重试…") + continue + else: + # 重试过后依然失败,推入 Redis 备用 + print("重试失败,将数据写入 Redis 以便后续处理") + self.push_record(data) + print("[交由Redis处理]") + break - # 3. 删除旧表中的那行 - sql_del = """ - DELETE FROM sh_dm_video_v2 - WHERE rn = %(rn)s - AND v_xid = %(v_xid)s - """ - self.cursor.execute(sql_del, data) - # 4. 带上 is_repeat 再插入新数据 - sql_ins = """ - INSERT INTO sh_dm_video_v2 ( - v_id, v_xid, rn, v_name, title, link, - is_piracy, edition, duration, - watch_number, follow_number, video_number, - public_time, cover_pic, sort, - u_xid, u_id, u_pic, u_name, - status, createtime, updatetime, - is_repeat - ) VALUES ( - %(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, - %(is_piracy)s, '', %(duration)s, - %(watch_number)s, %(fans)s, %(videos)s, - %(create_time)s, %(cover_pic)s, %(sort)s, - %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, - 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), - %(is_repeat)s - ) - """ - self.cursor.execute(sql_ins, data) - except Exception as e: - # 打印错误并回滚 - print("[数据库写入异常]", str(e)) - print("[出错数据]", data) - raise def flush(self): """批量执行完后手动提交。"""