From 3e5ec774cabe0c502d82f1788f7527cf0a15b42e Mon Sep 17 00:00:00 2001 From: Franklin-F Date: Mon, 30 Jun 2025 21:52:42 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E5=8E=9F=E5=AD=90?= =?UTF-8?q?=E6=93=8D=E4=BD=9C=E4=BB=A5=E6=B8=85=E7=A9=BA=E5=B9=B6=E5=86=99?= =?UTF-8?q?=E5=85=A5report=5Fqueue=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DB.py | 329 ++++++++++++++++++---------------------------------------- 1 file changed, 100 insertions(+), 229 deletions(-) diff --git a/DB.py b/DB.py index 450efd3..619f78a 100644 --- a/DB.py +++ b/DB.py @@ -675,15 +675,13 @@ class DBVidcon: class DBSA: FLUSH_EVERY_ROWS = 100 # 行阈值 - FLUSH_INTERVAL = 30 # 秒阈值 + FLUSH_INTERVAL = 30 # 秒阈值 _buf_op: list = [] _buf_vid: list = [] _buf_payload: list = [] _last_flush: float = time.time() _lock = threading.Lock() - _existing_op_keys = set() # 用于操作记录表的索引键 - _existing_vid_keys = set() # 用于视频表的唯一索引键 @staticmethod def push_record_many(rows): @@ -693,7 +691,6 @@ class DBSA: @classmethod def upsert_video(cls, data): data = copy.deepcopy(data) - # 设置默认值 data.setdefault("a_id", 0) data.setdefault("history_status", "") data.setdefault("is_repeat", 3) @@ -702,89 +699,62 @@ class DBSA: now_ts = int(time.time()) - # 生成操作记录表的唯一索引键 - op_index_key = ( - data["rn"] or "", - data["v_name"] or "", - data["keyword"] or "", - data["v_xid"] or "", - now_ts - ) + op_row = { + "v_id": data["v_id"], + "v_xid": data["v_xid"], + "a_id": data["a_id"], + "level": data["level"], + "name_title": data["v_name"], + "keyword": data["keyword"], + "rn": data["rn"], + "history_status": data["history_status"], + "is_repeat": 3, + "sort": data["sort"], + "createtime": now_ts, + "updatetime": now_ts, + "operatetime": now_ts, + "batch": data["batch"], + "machine": data.get("machine_id", 0), + "is_piracy": data.get("is_piracy", '3'), + "ts_status": data.get("ts_status", 1), + } - # 生成视频表的唯一索引键 - vid_index_key = ( - data["v_xid"] or "", - data["v_name"] or "" - ) + vid_row = { + "v_id": data["v_id"], + "v_xid": data["v_xid"], + "v_name": data["v_name"], + "title": data["title"], + "link": data["link"], + "edition": "", + "duration": str(data["duration"]) if data.get("duration") else '0', + "public_time": data["create_time"], + "cover_pic": data["cover_pic"], + "sort": data["sort"], + "u_xid": data["u_xid"], + "u_id": data["u_id"], + "u_pic": data["u_pic"], + "u_name": data["u_name"], + "status": 1, + "createtime": now_ts, + "updatetime": now_ts, + "operatetime": now_ts, + "watch_number": data.get("view", 0), + "follow_number": data.get("fans", 0), + "video_number": data.get("videos", 0), + "is_repeat": 3, + "is_piracy": data.get("is_piracy", 3), + "ts_status": data.get("ts_status", 1), + } with cls._lock: - # 检查两个索引键是否已存在 - if op_index_key in cls._existing_op_keys: - logger.debug(f"跳过重复操作记录: {op_index_key}") - return - - if vid_index_key in cls._existing_vid_keys: - logger.debug(f"跳过重复视频记录: {vid_index_key}") - return - - # 生成操作记录行 - op_row = { - "v_id": data["v_id"], - "v_xid": data["v_xid"], - "a_id": data["a_id"], - "level": data["level"], - "name_title": data["v_name"], - "keyword": data["keyword"], - "rn": data["rn"], - "history_status": data["history_status"], - "is_repeat": data["is_repeat"], - "sort": data["sort"], - "createtime": now_ts, - "updatetime": now_ts, - "operatetime": now_ts, - "batch": data["batch"], - "machine": data.get("machine_id", 0), - "is_piracy": data.get("is_piracy", '3'), - "ts_status": data.get("ts_status", 1), - } - - # 视频行生成 - vid_row = { - "v_id": data["v_id"], - "v_xid": data["v_xid"], - "v_name": data["v_name"], - "title": data["title"], - "link": data["link"], - "edition": "", - "duration": str(data["duration"]) if data.get("duration") else '0', - "public_time": data["create_time"], - "cover_pic": data["cover_pic"], - "sort": data["sort"], - "u_xid": data["u_xid"], - "u_id": data["u_id"], - "u_pic": data["u_pic"], - "u_name": data["u_name"], - "status": 1, - "createtime": now_ts, - "updatetime": now_ts, - "operatetime": now_ts, - "watch_number": data.get("view", 0), - "follow_number": data.get("fans", 0), - "video_number": data.get("videos", 0), - "is_repeat": 0, # 初始设为0,flush时更新 - "is_piracy": data.get("is_piracy", 3), - "ts_status": data.get("ts_status", 1), - } - cls._buf_op.append(op_row) cls._buf_vid.append(vid_row) cls._buf_payload.append(data) - cls._existing_op_keys.add(op_index_key) - cls._existing_vid_keys.add(vid_index_key) - buf_len = len(cls._buf_vid) + buf_len = len(cls._buf_vid) need_flush = False flush_reason = "" + if buf_len >= cls.FLUSH_EVERY_ROWS: need_flush = True flush_reason = "ROWS" @@ -806,105 +776,17 @@ class DBSA: cls._buf_vid.clear() cls._buf_payload.clear() cls._last_flush = time.time() - existing_op_keys_batch = cls._existing_op_keys.copy() - existing_vid_keys_batch = cls._existing_vid_keys.copy() - cls._existing_op_keys.clear() - cls._existing_vid_keys.clear() if not op_rows and not vid_rows: return - # ========== 视频表重复检测 ========== - vid_existing_keys = set() - if vid_rows: - # 生成所有视频唯一键 (v_xid, v_name) - all_vid_keys = list({(row["v_xid"], row["v_name"]) for row in vid_rows}) - - if all_vid_keys: # 仅当有键时才查询 - conn = _engine.connect() - try: - # 使用SQLAlchemy 1.4+的select语法 - sel_vid = select(video.c.v_xid, video.c.v_name).where( - tuple_(video.c.v_xid, video.c.v_name).in_(all_vid_keys) - ) - result = conn.execute(sel_vid) - vid_existing_keys = {(row.v_xid, row.v_name) for row in result} - except Exception as e: - logger.error(f"查询视频表时异常: {e}") - try: - cls.push_record_many(payloads) - except Exception as re: - logger.error("[Redis 回退失败]", re) - return - finally: - conn.close() - - # 设置重复标志 + # 所有 is_repeat 统一为 3 for i, vid_row in enumerate(vid_rows): - key = (vid_row["v_xid"], vid_row["v_name"]) - vid_row["is_repeat"] = 1 if key in vid_existing_keys else 0 - # 同时设置操作记录的重复标志 + vid_row["is_repeat"] = 3 if i < len(op_rows): - op_rows[i]["is_repeat"] = vid_row["is_repeat"] + op_rows[i]["is_repeat"] = 3 - # ========== 操作记录表去重 ========== - final_op_rows = [] - if op_rows: - # 生成所有操作记录索引键 - op_check_keys = [ - (row["rn"], row["name_title"], row["keyword"], row["v_xid"], row["createtime"]) - for row in op_rows - ] - - if op_check_keys: # 仅当有键时才查询 - try: - # 查询数据库已存在的操作记录 - conn = _engine.connect() - # 使用SQLAlchemy 1.4+的select语法 - stmt = select( - video_op.c.rn, - video_op.c.name_title, - video_op.c.keyword, - video_op.c.v_xid, - video_op.c.createtime - ).where( - tuple_( - video_op.c.rn, - video_op.c.name_title, - video_op.c.keyword, - video_op.c.v_xid, - video_op.c.createtime - ).in_(op_check_keys) - ) - result = conn.execute(stmt) - existing_db_op_keys = {tuple(row) for row in result} - conn.close() - except Exception as e: - logger.error(f"查询操作记录表失败: {e}") - try: - cls.push_record_many(payloads) - except Exception as re: - logger.error("Redis退回失败", re) - return - - # 过滤重复操作记录 - for idx, row in enumerate(op_rows): - index_key = ( - row["rn"], - row["name_title"], - row["keyword"], - row["v_xid"], - row["createtime"] - ) - - # 检查是否在数据库或当前批次中重复 - if index_key not in existing_db_op_keys and index_key not in existing_op_keys_batch: - final_op_rows.append(row) - existing_op_keys_batch.add(index_key) - else: - final_op_rows = op_rows[:] # 如果没有键,直接使用所有行 - - # ========== 作者表处理 ========== + # 处理作者表 authors_map = {} now_ts = int(time.time()) for data in payloads: @@ -948,62 +830,56 @@ class DBSA: logger.error("[Redis 回退失败]", re) return - # ========== 操作记录表写入 ========== - if final_op_rows: + # 插入操作记录表 + try: + op_rows.sort(key=lambda x: ( + x['rn'] or '', + x['name_title'] or '', + x['keyword'] or '', + x['v_xid'] or '', + x['createtime'] + )) + stmt = video_op.insert().values(op_rows) + with _engine.begin() as conn: + conn.execute(stmt) + logger.info(f"插入操作记录: {len(op_rows)} 条") + except Exception as e: + logger.error(f"插入操作记录失败: {e}") try: - # 按索引字段排序优化插入性能 - final_op_rows.sort(key=lambda x: ( - x['rn'] or '', - x['name_title'] or '', - x['keyword'] or '', - x['v_xid'] or '', - x['createtime'] - )) + cls.push_record_many(payloads) + except Exception as re: + logger.error("[Redis 回退失败]", re) + return - stmt = video_op.insert().values(final_op_rows) - with _engine.begin() as conn: - conn.execute(stmt) - logger.info(f"插入操作记录: {len(final_op_rows)} 条") - except Exception as e: - logger.error(f"插入操作记录失败: {e}") - try: - cls.push_record_many(payloads) - except Exception as re: - logger.error("[Redis 回退失败]", re) - return - - # ========== 视频表写入 ========== - if vid_rows: + # 插入/更新视频表 + try: + stmt = mysql_insert(video).values(vid_rows) + upd = { + "title": stmt.inserted.title, + "duration": stmt.inserted.duration, + "cover_pic": stmt.inserted.cover_pic, + "sort": stmt.inserted.sort, + "updatetime": stmt.inserted.updatetime, + "watch_number": stmt.inserted.watch_number, + "follow_number": stmt.inserted.follow_number, + "video_number": stmt.inserted.video_number, + "is_repeat": stmt.inserted.is_repeat, + "is_piracy": stmt.inserted.is_piracy, + "ts_status": stmt.inserted.ts_status, + } + ondup = stmt.on_duplicate_key_update(**upd) + with _engine.begin() as conn: + conn.execute(ondup) + logger.info( + f"[DBSA] flush 完成:authors={len(author_rows)} 条, " + f"op={len(op_rows)} 条, video={len(vid_rows)} 条" + ) + except Exception as e: + logger.error(f"写视频表失败: {e}") try: - # 使用新的唯一键 (v_xid, v_name) 进行 upsert - stmt = mysql_insert(video).values(vid_rows) - upd = { - "title": stmt.inserted.title, - "duration": stmt.inserted.duration, - "cover_pic": stmt.inserted.cover_pic, - "sort": stmt.inserted.sort, - "updatetime": stmt.inserted.updatetime, - "watch_number": stmt.inserted.watch_number, - "follow_number": stmt.inserted.follow_number, - "video_number": stmt.inserted.video_number, - "is_repeat": stmt.inserted.is_repeat, - "is_piracy": stmt.inserted.is_piracy, - "ts_status": stmt.inserted.ts_status, - } - ondup = stmt.on_duplicate_key_update(**upd) - with _engine.begin() as conn: - conn.execute(ondup) - - logger.info( - f"[DBSA] flush 完成:authors={len(author_rows)} 条, " - f"op={len(final_op_rows)} 条, video={len(vid_rows)} 条" - ) - except Exception as e: - logger.error(f"写视频表失败: {e}") - try: - cls.push_record_many(payloads) - except Exception as re: - logger.error("[Redis 回退失败]", re) + cls.push_record_many(payloads) + except Exception as re: + logger.error("[Redis 回退失败]", re) @classmethod def update_video_stats(cls, locator: dict, stats: dict) -> int: @@ -1014,7 +890,6 @@ class DBSA: params = dict(stats) params["updatetime"] = int(time.time()) - # 过滤只保留 video 表中存在的列 valid_cols = set(video.c.keys()) filtered_params = {k: v for k, v in params.items() if k in valid_cols} @@ -1030,9 +905,6 @@ class DBSA: @classmethod def update_video_stats_async(cls, locator: dict, stats: dict) -> None: - """ - 异步更新 sh_dm_video_v3 表中的统计字段,立即返回,不阻塞调用线程。 - """ thread = threading.Thread( target=cls.update_video_stats, args=(locator, stats), @@ -1048,9 +920,8 @@ class DBSA: conn.execute(statement) return except OperationalError as e: - # 检查是否为死锁错误 code = getattr(e.orig, "args", [None])[0] - if code == 1213 and attempt < 2: # 1213 是MySQL死锁错误码 + if code == 1213 and attempt < 2: time.sleep(0.5 * (attempt + 1)) continue raise \ No newline at end of file