From 3bfb71d64a5485aa6daefafaba433f1cff15981b Mon Sep 17 00:00:00 2001 From: xiaofeng wang Date: Thu, 17 Jul 2025 10:55:33 +0800 Subject: [PATCH] refactor: improve SQL statement formatting and enhance code readability --- DB.py | 363 +++++++++++++++++++++++++++++++--------------------------- 1 file changed, 192 insertions(+), 171 deletions(-) diff --git a/DB.py b/DB.py index 1e34ed4..eaaf227 100644 --- a/DB.py +++ b/DB.py @@ -29,51 +29,51 @@ _engine = create_engine( _meta = MetaData() # 操作记录表 video_op = Table("sh_dm_video_op_v3", _meta, - Column("id", Integer, primary_key=True, autoincrement=True), - Column("v_id", String(64)), - Column("v_xid", String(64)), - Column("a_id", Integer, default=0), - Column("level", Integer), - Column("name_title", String(100)), - Column("keyword", String(100)), - Column("rn", String(50)), - Column("history_status", String(100)), - Column("is_repeat", Integer), - Column("is_piracy", String(2), default='3'), - Column("sort", Integer), - Column("createtime", Integer), - Column("updatetime", Integer), - Column("operatetime", Integer), - Column("batch", Integer), - Column("machine", Integer), - Column("ts_status", Integer, default=1), -) + Column("id", Integer, primary_key=True, autoincrement=True), + Column("v_id", String(64)), + Column("v_xid", String(64)), + Column("a_id", Integer, default=0), + Column("level", Integer), + Column("name_title", String(100)), + Column("keyword", String(100)), + Column("rn", String(50)), + Column("history_status", String(100)), + Column("is_repeat", Integer), + Column("is_piracy", String(2), default='3'), + Column("sort", Integer), + Column("createtime", Integer), + Column("updatetime", Integer), + Column("operatetime", Integer), + Column("batch", Integer), + Column("machine", Integer), + Column("ts_status", Integer, default=1), + ) # 视频表 video = Table("sh_dm_video_v3", _meta, - Column("id", Integer, primary_key=True, autoincrement=True), - Column("v_id", String(64)), - Column("v_name", String(100), nullable=False), - Column("v_xid", String(64), nullable=False), - Column("title", String(255), nullable=False), - Column("link", String(255), nullable=False), - Column("edition", String(255), default=''), - Column("duration", String(11), default='0'), - Column("watch_number", Integer, default=0), - Column("follow_number", Integer, default=0), - Column("video_number", Integer, default=0), - Column("public_time", DateTime), - Column("cover_pic", String(255)), - Column("sort", Integer), - Column("u_xid", String(64)), - Column("u_id", String(100)), - Column("u_pic", String(255)), - Column("u_name", String(255)), - Column("status", Integer, default=1), - Column("createtime", Integer, default=0), - Column("updatetime", Integer, default=0), - Column("operatetime", Integer, default=0), -) + Column("id", Integer, primary_key=True, autoincrement=True), + Column("v_id", String(64)), + Column("v_name", String(100), nullable=False), + Column("v_xid", String(64), nullable=False), + Column("title", String(255), nullable=False), + Column("link", String(255), nullable=False), + Column("edition", String(255), default=''), + Column("duration", String(11), default='0'), + Column("watch_number", Integer, default=0), + Column("follow_number", Integer, default=0), + Column("video_number", Integer, default=0), + Column("public_time", DateTime), + Column("cover_pic", String(255)), + Column("sort", Integer), + Column("u_xid", String(64)), + Column("u_id", String(100)), + Column("u_pic", String(255)), + Column("u_name", String(255)), + Column("status", Integer, default=1), + Column("createtime", Integer, default=0), + Column("updatetime", Integer, default=0), + Column("operatetime", Integer, default=0), + ) # 作者表 video_author = Table( "sh_dm_video_author", @@ -116,7 +116,8 @@ def mysql_retry(max_retries: int = 3, base_delay: float = 1.0): }.get(errno, f"MySQL错误{errno}") wait = base_delay * (2 ** (attempt - 1)) - logger.warning(f"[MySQL][{fn.__name__}] 第{attempt}次重试({errno} {reason}):{e},等待 {wait:.1f}s...") + logger.warning( + f"[MySQL][{fn.__name__}] 第{attempt}次重试({errno} {reason}):{e},等待 {wait:.1f}s...") # 仅对断连类错误尝试重连 if errno in {2013, 2006}: @@ -128,8 +129,10 @@ def mysql_retry(max_retries: int = 3, base_delay: float = 1.0): raise return wrapper + return decorator + def redis_retry(max_retries: int = 3): """ 装饰器工厂:指定最大重试次数。 @@ -175,6 +178,7 @@ class DBVidcon: self.l0_list_key = "video_l0_queue" self.l1_list_key = "video_l1_queue" self.l2_list_key = "video_l2_queue" + self.web_list_key = "video_web_queue" self.report_list = "report_queue" self.error_list_key = "error_save_queue" self.conn = pymysql.connect(**self._MYSQL_CONF) @@ -257,6 +261,13 @@ class DBVidcon: self.redis.lpush(self.l2_list_key, *raws) logger.info(f"[写入l2] 已推入 {len(raws)} 条") + @redis_retry(max_retries=3) + def push_web(self, raws): + if isinstance(raws, str): + raws = [raws] + self.redis.lpush(self.web_list_key, *raws) + logger.info(f"[写入web] 已推入 {len(raws)} 条") + @redis_retry(max_retries=3) def push_report(self, raws): """原子操作:清空列表并写入新数据""" @@ -318,6 +329,17 @@ class DBVidcon: items = [] return items + @redis_retry(max_retries=3) + def get_web_items(self, count: int = 1): + try: + items = self.fetch_from_redis(count, list_key=self.web_list_key) + except Exception as e: + logger.info("[Redis web pop error]", e) + self.reconnect_redis() + items = [] + return items + + @redis_retry(max_retries=3) def item_keyword(self, count: int = 20): try: @@ -373,21 +395,28 @@ class DBVidcon: @mysql_retry() def get_report_video(self): sql = """ - SELECT - id, - name_title, - link - FROM - sh_dm_fight_records - WHERE - status = 1 - """ + SELECT id, + name_title, + link + FROM sh_dm_fight_records + WHERE status = 1 \ + """ self.cursor.execute(sql) return self.cursor.fetchall() @mysql_retry() - def get_subsequent_report_video(self,did: int): + def get_subsequent_report_video(self, did: int): sql = """ +<<<<<<< HEAD + SELECT DISTINCT report_id + FROM sh_dm_fight_records + WHERE status = 2 + AND subsequent_status = 1 + AND report_time != '' + AND mid = %s \ + """ + self.cursor.execute(sql, did) +======= SELECT DISTINCT report_id FROM sh_dm_fight_records WHERE @@ -397,29 +426,29 @@ class DBVidcon: AND mid = %s """ self.cursor.execute(sql,did) +>>>>>>> origin/main return self.cursor.fetchall() @mysql_retry() def getreport_video(self): sql = """ - SELECT - id, - v_xid - FROM - sh_dm_fight_records - WHERE - is_removed = '' or is_removed IS NULL or is_removed = 0 - """ + SELECT id, + v_xid + FROM sh_dm_fight_records + WHERE is_removed = '' + or is_removed IS NULL + or is_removed = 0 \ + """ self.cursor.execute(sql) return self.cursor.fetchall() @mysql_retry() def mark_video_removed(self, d_id: int, removed_flag: int = 1): sql = """ - UPDATE sh_dm_fight_records - SET is_removed = %s - WHERE id = %s - """ + UPDATE sh_dm_fight_records + SET is_removed = %s + WHERE id = %s \ + """ self.cursor.execute(sql, (removed_flag, d_id)) self.flush() @@ -464,11 +493,12 @@ class DBVidcon: @mysql_retry() def update_video_ts_status(self): sql = """ - UPDATE sh_dm_video_v2 v - JOIN sh_dm_video_author a ON v.u_xid = a.u_xid - SET v.ts_status = 3 - WHERE a.white_status = 1; - """ + UPDATE sh_dm_video_v2 v + JOIN sh_dm_video_author a + ON v.u_xid = a.u_xid + SET v.ts_status = 3 + WHERE a.white_status = 1; \ + """ self.cursor.execute(sql) self.flush() logger.info("[更新视频举报状态] 已执行完毕") @@ -488,38 +518,32 @@ class DBVidcon: while True: try: sql_op = """ - INSERT INTO sh_dm_video_op_v2 ( - v_id, v_xid, a_id, level, name_title, - keyword, rn, history_status, is_repeat, - sort, createtime, updatetime, batch, machine - ) VALUES ( - %(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, - %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, - %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s - ) - """ + INSERT INTO sh_dm_video_op_v2 (v_id, v_xid, a_id, level, name_title, + keyword, rn, history_status, is_repeat, + sort, createtime, updatetime, batch, machine) + VALUES (%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, + %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, + %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s) \ + """ self.cursor.execute(sql_op, data) sql_update = """ - INSERT INTO sh_dm_video_v2 ( - v_id, v_xid, rn, v_name, title, link, - edition, duration, - public_time, cover_pic, sort, - u_xid, u_id, u_pic, u_name, - status, createtime, updatetime - ) VALUES ( - %(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, - '', %(duration)s, - %(create_time)s, %(cover_pic)s, %(sort)s, - %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, - 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP() - ) - ON DUPLICATE KEY UPDATE - title = VALUES(title), - duration = VALUES(duration), - cover_pic = VALUES(cover_pic), - sort = VALUES(sort), - updatetime = UNIX_TIMESTAMP(); - """ + INSERT INTO sh_dm_video_v2 (v_id, v_xid, rn, v_name, title, link, + edition, duration, + public_time, cover_pic, sort, + u_xid, u_id, u_pic, u_name, + status, createtime, updatetime) + VALUES (%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, + '', %(duration)s, + %(create_time)s, %(cover_pic)s, %(sort)s, + %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, + 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP()) ON DUPLICATE KEY + UPDATE + title = + VALUES (title), duration = + VALUES (duration), cover_pic = + VALUES (cover_pic), sort = + VALUES (sort), updatetime = UNIX_TIMESTAMP(); \ + """ self.cursor.execute(sql_update, data) break # 成功跳出重试循环 @@ -545,33 +569,26 @@ class DBVidcon: @mysql_retry() def fetch_keyword_title(self, level: int = 99): sql = """ - SELECT - k.keyword, - k.rn, - t.title AS v_name, - ANY_VALUE(t.level) AS level - FROM - sh_dm_keyword k - LEFT JOIN - sh_dm_title t ON k.title = t.title - WHERE - k.status = 1 - AND t.status = 1 - AND NOT EXISTS ( - SELECT 1 - FROM sh_dm_black_keyword b - WHERE - ( - (b.keyword IS NULL OR b.keyword = '') AND b.title = t.title - ) - OR ( - b.keyword IS NOT NULL AND b.keyword != '' AND b.keyword = k.keyword - ) - ) - AND t.level = %s - GROUP BY - k.keyword, k.rn; - """ + SELECT k.keyword, + k.rn, + t.title AS v_name, + ANY_VALUE(t.level) AS level + FROM sh_dm_keyword k + LEFT JOIN + sh_dm_title t ON k.title = t.title + WHERE k.status = 1 + AND t.status = 1 + AND NOT EXISTS (SELECT 1 + FROM sh_dm_black_keyword b + WHERE ( + (b.keyword IS NULL OR b.keyword = '') AND b.title = t.title + ) + OR ( + b.keyword IS NOT NULL AND b.keyword != '' AND b.keyword = k.keyword + )) + AND t.level = %s + GROUP BY k.keyword, k.rn; \ + """ self.cursor.execute(sql, (level,)) return self.cursor.fetchall() @@ -592,10 +609,10 @@ class DBVidcon: start_ts = int(time.time()) sql = """ - INSERT INTO sh_dm_batch_log - (batch, info, t0, t1, t2, starttime) - VALUES (%s, %s, %s, %s, %s, %s) - """ + INSERT INTO sh_dm_batch_log + (batch, info, t0, t1, t2, starttime) + VALUES (%s, %s, %s, %s, %s, %s) \ + """ try: self.cursor.execute(sql, (batch, level, t0, t1, t2, start_ts)) self.conn.commit() @@ -666,6 +683,10 @@ class DBVidcon: def l2_empty(self) -> bool: return self.redis.llen(self.l2_list_key) == 0 + @redis_retry(max_retries=3) + def web_empty(self) -> bool: + return self.redis.llen(self.web_list_key) == 0 + @redis_retry(max_retries=3) def pop_error_item(self): """ @@ -679,30 +700,30 @@ class DBVidcon: class DBSA: # ======= 可调参数 ======= - FLUSH_EVERY_ROWS = 100 # 行数阈值 - FLUSH_INTERVAL = 30 # 秒阈值 - MAX_SQL_RETRY = 3 # SQL 死锁自旋 - SQL_RETRY_BASE_SLEEP = 0.5 # 自旋退避基数 - FLUSH_RETRY = 3 # flush 整体轮次 - DELAY_ON_FAIL = 10 # flush 失败等待 - DEADLOCK_ERRNO = 1213 # MySQL 死锁码 - LOCK_TIMEOUT = 3 # 互斥锁超时 + FLUSH_EVERY_ROWS = 100 # 行数阈值 + FLUSH_INTERVAL = 30 # 秒阈值 + MAX_SQL_RETRY = 3 # SQL 死锁自旋 + SQL_RETRY_BASE_SLEEP = 0.5 # 自旋退避基数 + FLUSH_RETRY = 3 # flush 整体轮次 + DELAY_ON_FAIL = 10 # flush 失败等待 + DEADLOCK_ERRNO = 1213 # MySQL 死锁码 + LOCK_TIMEOUT = 3 # 互斥锁超时 # ======================== # ----- 缓冲区 ----- - _buf_op = [] - _buf_vid = [] + _buf_op = [] + _buf_vid = [] _buf_payload = [] - _last_flush = time.time() + _last_flush = time.time() # ----- 并发控制 ----- - _lock = threading.Lock() - _existing_op_keys = set() + _lock = threading.Lock() + _existing_op_keys = set() _existing_vid_keys = set() # ----- queue / 后台线程模式 ----- _queue_mode = False - _queue = Queue() + _queue = Queue() # ================== 退回 Redis 模拟 ================== @staticmethod @@ -726,7 +747,7 @@ class DBSA: data["sort"] = data.get("index", 0) now_ts = int(time.time()) - op_index_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) + op_index_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) vid_index_key = (data["v_xid"] or "", data["v_name"] or "") # ---------- ① 获取互斥锁 ---------- @@ -748,8 +769,8 @@ class DBSA: keyword=data["keyword"], is_repeat=data["is_repeat"], sort=data["sort"], - createtime=now_ts, # 首次插入 - updatetime=now_ts, # 后续更新只改这一列 + createtime=now_ts, # 首次插入 + updatetime=now_ts, # 后续更新只改这一列 batch=data.get("batch", 0), machine=data.get("machine_id", 0), is_piracy=data.get("is_piracy", '3'), @@ -850,9 +871,9 @@ class DBSA: if not cls._lock.acquire(timeout=cls.LOCK_TIMEOUT): raise RuntimeError("flush 未取得 cls._lock,可能死锁") try: - op_rows = cls._buf_op[:] - vid_rows = cls._buf_vid[:] - payloads = cls._buf_payload[:] + op_rows = cls._buf_op[:] + vid_rows = cls._buf_vid[:] + payloads = cls._buf_payload[:] cls._clear_buffers() finally: cls._lock.release() @@ -894,9 +915,9 @@ class DBSA: if op_rows: stmt_op = mysql_insert(video_op).values(op_rows) ondup_op = stmt_op.on_duplicate_key_update( - updatetime = stmt_op.inserted.updatetime, + updatetime=stmt_op.inserted.updatetime, # ts_status = stmt_op.inserted.ts_status, - is_repeat = stmt_op.inserted.is_repeat, + is_repeat=stmt_op.inserted.is_repeat, ) cls._safe_execute(ondup_op, desc="video_op") logger.info("落表:操作记录 %d 条", len(op_rows)) @@ -905,23 +926,23 @@ class DBSA: if vid_rows: stmt_vid = mysql_insert(video).values(vid_rows) ondup_vid = stmt_vid.on_duplicate_key_update( - title = stmt_vid.inserted.title, - v_name = stmt_vid.inserted.v_name, - link = stmt_vid.inserted.link, - edition = stmt_vid.inserted.edition, - duration = stmt_vid.inserted.duration, - watch_number = stmt_vid.inserted.watch_number, - follow_number = stmt_vid.inserted.follow_number, - video_number = stmt_vid.inserted.video_number, - public_time = stmt_vid.inserted.public_time, - cover_pic = stmt_vid.inserted.cover_pic, - sort = stmt_vid.inserted.sort, - u_xid = stmt_vid.inserted.u_xid, - u_id = stmt_vid.inserted.u_id, - u_pic = stmt_vid.inserted.u_pic, - u_name = stmt_vid.inserted.u_name, - status = stmt_vid.inserted.status, - updatetime = stmt_vid.inserted.updatetime, + title=stmt_vid.inserted.title, + v_name=stmt_vid.inserted.v_name, + link=stmt_vid.inserted.link, + edition=stmt_vid.inserted.edition, + duration=stmt_vid.inserted.duration, + watch_number=stmt_vid.inserted.watch_number, + follow_number=stmt_vid.inserted.follow_number, + video_number=stmt_vid.inserted.video_number, + public_time=stmt_vid.inserted.public_time, + cover_pic=stmt_vid.inserted.cover_pic, + sort=stmt_vid.inserted.sort, + u_xid=stmt_vid.inserted.u_xid, + u_id=stmt_vid.inserted.u_id, + u_pic=stmt_vid.inserted.u_pic, + u_name=stmt_vid.inserted.u_name, + status=stmt_vid.inserted.status, + updatetime=stmt_vid.inserted.updatetime, ) cls._safe_execute(ondup_vid, desc="video") logger.info("落表:视频记录 %d 条", len(vid_rows)) @@ -985,7 +1006,7 @@ class DBSA: data["sort"] = data.get("index", 0) now_ts = int(time.time()) - op_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) + op_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) vid_key = (data["v_xid"] or "", data["v_name"] or "") if op_key in cls._existing_op_keys or vid_key in cls._existing_vid_keys: return