From 4141837764a1fa82692dea12a429fe9898dfa1fd Mon Sep 17 00:00:00 2001 From: Franklin-F Date: Sun, 29 Jun 2025 11:06:57 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E6=93=8D=E4=BD=9C=E9=80=BB=E8=BE=91=EF=BC=8C=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E5=94=AF=E4=B8=80=E7=B4=A2=E5=BC=95=E6=A3=80=E6=9F=A5?= =?UTF-8?q?=E4=BB=A5=E9=81=BF=E5=85=8D=E9=87=8D=E5=A4=8D=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DB.py | 307 ++++++++++++++++++++++++++++++++++++++---------------- report.py | 2 +- 2 files changed, 218 insertions(+), 91 deletions(-) diff --git a/DB.py b/DB.py index 6355f17..d3aa014 100644 --- a/DB.py +++ b/DB.py @@ -658,51 +658,112 @@ class DBSA: FLUSH_EVERY_ROWS = 100 # 行阈值 FLUSH_INTERVAL = 30 # 秒阈值 - _buf_op: List[Dict] = [] - _buf_vid: List[Dict] = [] - _buf_payload: List[Dict] = [] + _buf_op: list = [] + _buf_vid: list = [] + _buf_payload: list = [] _last_flush: float = time.time() _lock = threading.Lock() + _existing_op_keys = set() # 用于操作记录表的索引键 + _existing_vid_keys = set() # 用于视频表的唯一索引键 - push_record_many = staticmethod( - lambda rows: logger.info("[退回Redis] cnt=", len(rows)) - ) + @staticmethod + def push_record_many(rows): + """退回Redis的模拟方法""" + logger.info(f"[退回Redis] cnt={len(rows)}") @classmethod def upsert_video(cls, data): data = copy.deepcopy(data) + # 设置默认值 data.setdefault("a_id", 0) data.setdefault("history_status", "") - data.setdefault("is_repeat", 3) # 避免 KeyError + data.setdefault("is_repeat", 3) + data.setdefault("keyword", "") data["sort"] = data.get("index", 0) now_ts = int(time.time()) - op_row = { - "v_id": data["v_id"], "v_xid": data["v_xid"], "a_id": data["a_id"], - "level": data["level"], "name_title": data["v_name"], - "keyword": data["keyword"], "rn": data["rn"], - "history_status": data["history_status"], "is_repeat": data["is_repeat"], - "sort": data["sort"], "createtime": now_ts, "updatetime": now_ts, - "batch": data["batch"], "machine": data["machine_id"], - } - vid_row = { - "v_id": data["v_id"], "v_xid": data["v_xid"], "rn": data["rn"], - "v_name": data["v_name"], "title": data["title"], "link": data["link"], - "edition": "", "duration": data["duration"], - "public_time": data["create_time"], "cover_pic": data["cover_pic"], - "sort": data["sort"], "u_xid": data["u_xid"], "u_id": data["u_id"], - "u_pic": data["u_pic"], "u_name": data["u_name"], - "status": 1, "createtime": now_ts, "updatetime": now_ts, "watch_number": data.get("view", 0), - "follow_number": data.get("fans", 0), - "video_number": data.get("videos", 0), - } + + # 生成操作记录表的唯一索引键 + op_index_key = ( + data["rn"] or "", + data["v_name"] or "", + data["keyword"] or "", + data["v_xid"] or "", + now_ts + ) + + # 生成视频表的唯一索引键 + vid_index_key = ( + data["v_xid"] or "", + data["v_name"] or "" + ) with cls._lock: + # 检查两个索引键是否已存在 + if op_index_key in cls._existing_op_keys: + logger.debug(f"跳过重复操作记录: {op_index_key}") + return + + if vid_index_key in cls._existing_vid_keys: + logger.debug(f"跳过重复视频记录: {vid_index_key}") + return + + # 生成操作记录行 + op_row = { + "v_id": data["v_id"], + "v_xid": data["v_xid"], + "a_id": data["a_id"], + "level": data["level"], + "name_title": data["v_name"], + "keyword": data["keyword"], + "rn": data["rn"], + "history_status": data["history_status"], + "is_repeat": data["is_repeat"], + "sort": data["sort"], + "createtime": now_ts, + "updatetime": now_ts, + "operatetime": now_ts, + "batch": data["batch"], + "machine": data.get("machine_id", 0), + "is_piracy": data.get("is_piracy", '3'), + "ts_status": data.get("ts_status", 1), + } + + # 视频行生成 + vid_row = { + "v_id": data["v_id"], + "v_xid": data["v_xid"], + "rn": data["rn"], + "v_name": data["v_name"], + "title": data["title"], + "link": data["link"], + "edition": "", + "duration": str(data["duration"]) if data.get("duration") else '0', + "public_time": data["create_time"], + "cover_pic": data["cover_pic"], + "sort": data["sort"], + "u_xid": data["u_xid"], + "u_id": data["u_id"], + "u_pic": data["u_pic"], + "u_name": data["u_name"], + "status": 1, + "createtime": now_ts, + "updatetime": now_ts, + "operatetime": now_ts, + "watch_number": data.get("view", 0), + "follow_number": data.get("fans", 0), + "video_number": data.get("videos", 0), + "is_repeat": 0, # 初始设为0,flush时更新 + "is_piracy": data.get("is_piracy", 3), + "ts_status": data.get("ts_status", 1), + } + cls._buf_op.append(op_row) cls._buf_vid.append(vid_row) - cls._buf_payload.append(data) # 保存原始 + cls._buf_payload.append(data) + cls._existing_op_keys.add(op_index_key) + cls._existing_vid_keys.add(vid_index_key) buf_len = len(cls._buf_vid) - # logger.info(f"DB缓冲 -> xid={data['v_xid']}, level={data['level']}, buffer={buf_len}") need_flush = False flush_reason = "" @@ -714,33 +775,9 @@ class DBSA: flush_reason = "TIME" if need_flush: - logger.info(f"DBSA 落 ({flush_reason}) ...") + logger.info(f"DBSA 落盘 ({flush_reason}) ...") cls.flush() - @classmethod - def _bulk_insert(cls, rows: List[Dict]): - if not rows: - return - stmt = video_op.insert().values(rows) - with _engine.begin() as conn: - conn.execute(stmt) - - @classmethod - def _bulk_upsert(cls, rows: List[Dict]): - if not rows: - return - stmt = mysql_insert(video).values(rows) - upd = { - "title": stmt.inserted.title, - "duration": stmt.inserted.duration, - "cover_pic": stmt.inserted.cover_pic, - "sort": stmt.inserted.sort, - "updatetime": stmt.inserted.updatetime, - } - ondup = stmt.on_duplicate_key_update(**upd) - with _engine.begin() as conn: - conn.execute(ondup) - @classmethod def flush(cls): with cls._lock: @@ -751,42 +788,103 @@ class DBSA: cls._buf_vid.clear() cls._buf_payload.clear() cls._last_flush = time.time() + existing_op_keys_batch = cls._existing_op_keys.copy() + existing_vid_keys_batch = cls._existing_vid_keys.copy() + cls._existing_op_keys.clear() + cls._existing_vid_keys.clear() if not op_rows and not vid_rows: return - existing_keys = set() + # ========== 视频表重复检测 ========== + vid_existing_keys = set() if vid_rows: - all_keys = list({(row["v_xid"], row["rn"]) for row in vid_rows}) + # 生成所有视频唯一键 (v_xid, v_name) + all_vid_keys = list({(row["v_xid"], row["v_name"]) for row in vid_rows}) + conn = _engine.connect() try: - sel_vid = ( - video.select() - .with_only_columns(video.c.v_xid, video.c.rn) - .where(tuple_(video.c.v_xid, video.c.rn).in_(all_keys)) + # 查询数据库中已存在的视频记录 + sel_vid = select([ + video.c.v_xid, + video.c.v_name + ]).where( + tuple_(video.c.v_xid, video.c.v_name).in_(all_vid_keys) ) - existing_keys = {(row.v_xid, row.rn) for row in conn.execute(sel_vid).fetchall()} + result = conn.execute(sel_vid) + vid_existing_keys = {(row.v_xid, row.v_name) for row in result} except Exception as e: - logger.info(f"[DBSA] 查询 video 表时异常: {e}") + logger.error(f"查询视频表时异常: {e}") try: cls.push_record_many(payloads) except Exception as re: - logger.info("[Redis 回退失败]", re) + logger.error("[Redis 回退失败]", re) return finally: conn.close() - # 先给日志表的 op_rows 设置 0/1:1=重复,0=不重复 + # 设置重复标志 for i, vid_row in enumerate(vid_rows): - key = (vid_row["v_xid"], vid_row["rn"]) - op_rows[i]["is_repeat"] = 1 if key in existing_keys else 0 + key = (vid_row["v_xid"], vid_row["v_name"]) + vid_row["is_repeat"] = 1 if key in vid_existing_keys else 0 + # 同时设置操作记录的重复标志 + if i < len(op_rows): + op_rows[i]["is_repeat"] = vid_row["is_repeat"] - # 再把同样的 is_repeat 值写到 vid_rows,以便视频表也能存到 0/1 - for i, vid_row in enumerate(vid_rows): - vid_row["is_repeat"] = op_rows[i]["is_repeat"] - vid_row.pop("level", None) + # ========== 操作记录表去重 ========== + final_op_rows = [] + if op_rows: + try: + # 生成所有操作记录索引键 + op_check_keys = [ + (row["rn"], row["name_title"], row["keyword"], row["v_xid"], row["createtime"]) + for row in op_rows + ] - # 以下作者表、日志表和视频表写入逻辑保持不变... + # 查询数据库已存在的操作记录 + conn = _engine.connect() + stmt = select([ + video_op.c.rn, + video_op.c.name_title, + video_op.c.keyword, + video_op.c.v_xid, + video_op.c.createtime + ]).where( + tuple_( + video_op.c.rn, + video_op.c.name_title, + video_op.c.keyword, + video_op.c.v_xid, + video_op.c.createtime + ).in_(op_check_keys) + ) + result = conn.execute(stmt) + existing_db_op_keys = {tuple(row) for row in result} + conn.close() + except Exception as e: + logger.error(f"查询操作记录表失败: {e}") + try: + cls.push_record_many(payloads) + except Exception as re: + logger.error("Redis退回失败", re) + return + + # 过滤重复操作记录 + for idx, row in enumerate(op_rows): + index_key = ( + row["rn"], + row["name_title"], + row["keyword"], + row["v_xid"], + row["createtime"] + ) + + # 检查是否在数据库或当前批次中重复 + if index_key not in existing_db_op_keys and index_key not in existing_op_keys_batch: + final_op_rows.append(row) + existing_op_keys_batch.add(index_key) + + # ========== 作者表处理 ========== authors_map = {} now_ts = int(time.time()) for data in payloads: @@ -823,36 +921,69 @@ class DBSA: try: cls._execute_with_deadlock_retry(ondup_author) except Exception as e: - logger.info(f"[DBSA] 写作者表失败(死锁重试后仍未成功): {e}") + logger.error(f"写作者表失败: {e}") try: cls.push_record_many(payloads) except Exception as re: - logger.info("[Redis 回退失败]", re) + logger.error("[Redis 回退失败]", re) return - if op_rows: + # ========== 操作记录表写入 ========== + if final_op_rows: try: - cls._bulk_insert(op_rows) + # 按索引字段排序优化插入性能 + final_op_rows.sort(key=lambda x: ( + x['rn'] or '', + x['name_title'] or '', + x['keyword'] or '', + x['v_xid'] or '', + x['createtime'] + )) + + stmt = video_op.insert().values(final_op_rows) + with _engine.begin() as conn: + conn.execute(stmt) + logger.info(f"插入操作记录: {len(final_op_rows)} 条") except Exception as e: - logger.info(f"[DBSA] 写日志表失败: {e}") + logger.error(f"插入操作记录失败: {e}") try: cls.push_record_many(payloads) except Exception as re: - logger.info("[Redis 回退失败]", re) + logger.error("[Redis 回退失败]", re) return + # ========== 视频表写入 ========== if vid_rows: try: - cls._bulk_upsert(vid_rows) + # 使用新的唯一键 (v_xid, v_name) 进行 upsert + stmt = mysql_insert(video).values(vid_rows) + upd = { + "title": stmt.inserted.title, + "duration": stmt.inserted.duration, + "cover_pic": stmt.inserted.cover_pic, + "sort": stmt.inserted.sort, + "updatetime": stmt.inserted.updatetime, + "watch_number": stmt.inserted.watch_number, + "follow_number": stmt.inserted.follow_number, + "video_number": stmt.inserted.video_number, + "is_repeat": stmt.inserted.is_repeat, + "is_piracy": stmt.inserted.is_piracy, + "ts_status": stmt.inserted.ts_status, + } + ondup = stmt.on_duplicate_key_update(**upd) + with _engine.begin() as conn: + conn.execute(ondup) + logger.info( - f"[DBSA] flush 完成:authors={len(author_rows)} 条,op={len(op_rows)} 条,video={len(vid_rows)} 条" + f"[DBSA] flush 完成:authors={len(author_rows)} 条, " + f"op={len(final_op_rows)} 条, video={len(vid_rows)} 条" ) except Exception as e: - logger.info(f"[DBSA] 写视频表失败: {e} op={len(op_rows)} video={len(vid_rows)}") + logger.error(f"写视频表失败: {e}") try: cls.push_record_many(payloads) except Exception as re: - logger.info("[Redis 回退失败]", re) + logger.error("[Redis 回退失败]", re) @classmethod def update_video_stats(cls, locator: dict, stats: dict) -> int: @@ -880,7 +1011,7 @@ class DBSA: @classmethod def update_video_stats_async(cls, locator: dict, stats: dict) -> None: """ - 异步更新 sh_dm_video_v2 表中的统计字段,立即返回,不阻塞调用线程。 + 异步更新 sh_dm_video_v3 表中的统计字段,立即返回,不阻塞调用线程。 """ thread = threading.Thread( target=cls.update_video_stats, @@ -893,17 +1024,13 @@ class DBSA: def _execute_with_deadlock_retry(cls, statement): for attempt in range(3): try: - with _engine.begin() as conn: conn.execute(statement) return except OperationalError as e: - # e.orig.args[0] == 1213 表示死锁 - code = None - if hasattr(e.orig, "args") and len(e.orig.args) >= 1: - code = e.orig.args[0] - if code == 1213 and attempt < 3 - 1: - time.sleep(1) + # 检查是否为死锁错误 + code = getattr(e.orig, "args", [None])[0] + if code == 1213 and attempt < 2: # 1213 是MySQL死锁错误码 + time.sleep(0.5 * (attempt + 1)) continue - # 不是死锁,或者已经重试到达上限,直接抛出 - raise + raise \ No newline at end of file diff --git a/report.py b/report.py index c8ed3bf..83aeb5b 100644 --- a/report.py +++ b/report.py @@ -94,7 +94,7 @@ while True: ) db.flush() except Exception as e: - logger.logger.error(f"ID:{li['id']}, e:{e}") + logger.error(f"ID:{li['id']}, e:{e}") db.update_fight_record_status(li['id'], 0, 3, str(e), mid=MACHINE_ID) time.sleep(60) # 出错延迟