diff --git a/DB.py b/DB.py index 873279c..ec66d53 100644 --- a/DB.py +++ b/DB.py @@ -665,22 +665,33 @@ class DBVidcon: return json.loads(item) if item is not None else None -class DBSA: - FLUSH_EVERY_ROWS = 100 - FLUSH_INTERVAL = 30 - _buf_op = [] - _buf_vid = [] - _buf_payload = [] - _last_flush = time.time() - _lock = threading.Lock() - _existing_op_keys = set() +class DBSA: + # ---------- 参数区,可按需调整 ---------- + FLUSH_EVERY_ROWS = 100 # 行数阈值 + FLUSH_INTERVAL = 30 # 秒阈值 + MAX_SQL_RETRY = 3 # 单条 SQL 死锁自旋次数 + SQL_RETRY_BASE_SLEEP = 0.5 # SQL 第一次重试等待 0.5s,指数递增 + FLUSH_RETRY = 3 # 整体 flush 最多尝试次数 + DELAY_ON_FAIL = 10 # flush 失败等待秒数 + DEADLOCK_ERRNO = 1213 # MySQL 死锁错误码 + # ------------------------------------------- + + _buf_op = [] + _buf_vid = [] + _buf_payload = [] + _last_flush = time.time() + _lock = threading.Lock() + _existing_op_keys = set() _existing_vid_keys = set() + # ---------------- 对外接口 ----------------- @staticmethod def push_record_many(rows): + """失败时退回 Redis 的占位函数""" logger.info(f"[退回Redis] cnt={len(rows)}") + # ---------------- 数据写入入口 ------------- @classmethod def upsert_video(cls, data): data = copy.deepcopy(data) @@ -691,25 +702,19 @@ class DBSA: now_ts = int(time.time()) - op_index_key = ( - data["v_xid"] or "", - data["keyword"] or "", - now_ts - ) - vid_index_key = ( - data["v_xid"] or "", - data["title"] or "" - ) + op_index_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) + vid_index_key = (data["v_xid"] or "", data["title"] or "") + # ---------- 加锁写入缓冲 ---------- with cls._lock: if op_index_key in cls._existing_op_keys: logger.debug(f"跳过重复操作记录: {op_index_key}") return - if vid_index_key in cls._existing_vid_keys: logger.debug(f"跳过重复视频记录: {vid_index_key}") return + # 组装 op_row、vid_row(保持你原逻辑) op_row = { "v_id": data["v_id"], "v_xid": data["v_xid"], @@ -750,39 +755,88 @@ class DBSA: "watch_number": data.get("view", 0), "follow_number": data.get("fans", 0), "video_number": data.get("videos", 0), - # "ts_status": data.get("ts_status", 1), } # 只保留 video 表中合法字段 video_fields = {c.name for c in video.columns} vid_row = {k: v for k, v in vid_row.items() if k in video_fields} + # 写入缓冲 cls._buf_op.append(op_row) cls._buf_vid.append(vid_row) cls._buf_payload.append(data) cls._existing_op_keys.add(op_index_key) cls._existing_vid_keys.add(vid_index_key) - if len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS or time.time() - cls._last_flush >= cls.FLUSH_INTERVAL: + # 判断是否触发 flush + if (len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS + or time.time() - cls._last_flush >= cls.FLUSH_INTERVAL): logger.info("落表:达到行数或超时阈值,开始落库") cls.flush() + # ------------- SQL 安全执行 --------------- + @classmethod + def _safe_execute(cls, statement, desc=""): + """带死锁自旋重试""" + for attempt in range(cls.MAX_SQL_RETRY): + try: + with _engine.begin() as conn: + conn.execute(statement) + return + except Exception as e: + err_no = getattr(e.orig, "args", [None])[0] + if err_no == cls.DEADLOCK_ERRNO and attempt < cls.MAX_SQL_RETRY - 1: + sleep(cls.SQL_RETRY_BASE_SLEEP * (attempt + 1)) + logger.warning("[%s] 死锁重试 %d/%d", + desc, attempt + 1, cls.MAX_SQL_RETRY) + continue + raise + + # ------------- 外层 flush(带整体重试) --------------- @classmethod def flush(cls): + for round_no in range(1, cls.FLUSH_RETRY + 1): + try: + cls._flush_once() + return # 成功即退出 + except Exception as e: + logger.error("[flush] 第 %d 轮失败: %s", round_no, e) + if round_no < cls.FLUSH_RETRY: + sleep(cls.DELAY_ON_FAIL) + logger.info("[flush] 等待 %ds 后重试…", cls.DELAY_ON_FAIL) + else: + logger.error("[flush] 连续 %d 轮失败,退回 Redis", cls.FLUSH_RETRY) + cls.push_record_many(cls._buf_payload) + # 清空缓冲,避免死循环 + cls._buf_op.clear() + cls._buf_vid.clear() + cls._buf_payload.clear() + cls._existing_op_keys.clear() + cls._existing_vid_keys.clear() + cls._last_flush = time.time() + return + + # ------------- 真正写库动作 --------------- + @classmethod + def _flush_once(cls): + """一次完整落库流程,任何异常让上层捕获""" + # --- 拷贝缓冲并清空 --- with cls._lock: - op_rows = cls._buf_op[:] - vid_rows = cls._buf_vid[:] - payloads = cls._buf_payload[:] + op_rows = cls._buf_op[:] + vid_rows = cls._buf_vid[:] + payloads = cls._buf_payload[:] + cls._buf_op.clear() cls._buf_vid.clear() cls._buf_payload.clear() - cls._last_flush = time.time() cls._existing_op_keys.clear() cls._existing_vid_keys.clear() + cls._last_flush = time.time() if not op_rows and not vid_rows: return + # --- 写作者表 ------------------------------------------------- authors_map = {} now_ts = int(time.time()) for data in payloads: @@ -802,83 +856,54 @@ class DBSA: "update_time": now_ts } - author_rows = list(authors_map.values()) - if author_rows: - stmt_author = mysql_insert(video_author).values(author_rows) + if authors_map: + stmt_author = mysql_insert(video_author).values(list(authors_map.values())) upd_author = { - "u_name": stmt_author.inserted.u_name, - "u_pic": stmt_author.inserted.u_pic, + "u_name": stmt_author.inserted.u_name, + "u_pic": stmt_author.inserted.u_pic, "follow_number": stmt_author.inserted.follow_number, - "v_number": stmt_author.inserted.v_number, - "pv_number": stmt_author.inserted.pv_number, - "b_number": stmt_author.inserted.b_number, - "update_time": stmt_author.inserted.update_time, + "v_number": stmt_author.inserted.v_number, + "pv_number": stmt_author.inserted.pv_number, + "b_number": stmt_author.inserted.b_number, + "update_time": stmt_author.inserted.update_time, } ondup_author = stmt_author.on_duplicate_key_update(**upd_author) - try: - cls._execute_with_deadlock_retry(ondup_author) - except Exception as e: - logger.error(f"写作者表失败: {e}") - cls.push_record_many(payloads) - return + cls._safe_execute(ondup_author, desc="video_author") + # --- 写 video_op ---------------------------------------------- if op_rows: - try: - stmt = mysql_insert(video_op).values(op_rows) - ondup = stmt.on_duplicate_key_update( - updatetime=stmt.inserted.updatetime, - operatetime=stmt.inserted.operatetime, - ts_status=stmt.inserted.ts_status, - is_repeat=stmt.inserted.is_repeat, - ) - with _engine.begin() as conn: - conn.execute(ondup) - logger.info(f"落表:操作记录 {len(op_rows)} 条") - except Exception as e: - logger.error(f"写操作记录表失败: {e}") - cls.push_record_many(payloads) - return + stmt_op = mysql_insert(video_op).values(op_rows) + ondup_op = stmt_op.on_duplicate_key_update( + updatetime=stmt_op.inserted.updatetime, + operatetime=stmt_op.inserted.operatetime, + ts_status=stmt_op.inserted.ts_status, + is_repeat=stmt_op.inserted.is_repeat, + ) + cls._safe_execute(ondup_op, desc="video_op") + logger.info("落表:操作记录 %d 条", len(op_rows)) + # --- 写 video -------------------------------------------------- if vid_rows: - try: - stmt = mysql_insert(video).values(vid_rows) - upd = { - "title": stmt.inserted.title, - "link": stmt.inserted.link, - "edition": stmt.inserted.edition, - "duration": stmt.inserted.duration, - "watch_number": stmt.inserted.watch_number, - "follow_number": stmt.inserted.follow_number, - "video_number": stmt.inserted.video_number, - "public_time": stmt.inserted.public_time, - "cover_pic": stmt.inserted.cover_pic, - "sort": stmt.inserted.sort, - "u_xid": stmt.inserted.u_xid, - "u_id": stmt.inserted.u_id, - "u_pic": stmt.inserted.u_pic, - "u_name": stmt.inserted.u_name, - "status": stmt.inserted.status, - # "ts_status": stmt.inserted.ts_status, - "updatetime": stmt.inserted.updatetime, - "operatetime": stmt.inserted.operatetime, - } - ondup = stmt.on_duplicate_key_update(**upd) - with _engine.begin() as conn: - conn.execute(ondup) - logger.info(f"落表:视频记录 {len(vid_rows)} 条") - except Exception as e: - logger.error(f"写视频表失败: {e}") - cls.push_record_many(payloads) - - @classmethod - def _execute_with_deadlock_retry(cls, statement): - for attempt in range(3): - try: - with _engine.begin() as conn: - conn.execute(statement) - return - except Exception as e: - if getattr(e.orig, "args", [None])[0] == 1213 and attempt < 2: - time.sleep(0.5 * (attempt + 1)) - continue - raise + stmt_vid = mysql_insert(video).values(vid_rows) + upd = { + "title": stmt_vid.inserted.title, + "link": stmt_vid.inserted.link, + "edition": stmt_vid.inserted.edition, + "duration": stmt_vid.inserted.duration, + "watch_number": stmt_vid.inserted.watch_number, + "follow_number": stmt_vid.inserted.follow_number, + "video_number": stmt_vid.inserted.video_number, + "public_time": stmt_vid.inserted.public_time, + "cover_pic": stmt_vid.inserted.cover_pic, + "sort": stmt_vid.inserted.sort, + "u_xid": stmt_vid.inserted.u_xid, + "u_id": stmt_vid.inserted.u_id, + "u_pic": stmt_vid.inserted.u_pic, + "u_name": stmt_vid.inserted.u_name, + "status": stmt_vid.inserted.status, + "updatetime": stmt_vid.inserted.updatetime, + "operatetime": stmt_vid.inserted.operatetime, + } + ondup_vid = stmt_vid.on_duplicate_key_update(**upd) + cls._safe_execute(ondup_vid, desc="video") + logger.info("落表:视频记录 %d 条", len(vid_rows)) \ No newline at end of file