diff --git a/DB.py b/DB.py index a171338..ddbfd92 100644 --- a/DB.py +++ b/DB.py @@ -676,10 +676,10 @@ class DBSA: FLUSH_EVERY_ROWS = 100 FLUSH_INTERVAL = 30 - _buf_op: list = [] - _buf_vid: list = [] - _buf_payload: list = [] - _last_flush: float = time.time() + _buf_op = [] + _buf_vid = [] + _buf_payload = [] + _last_flush = time.time() _lock = threading.Lock() @staticmethod @@ -713,12 +713,13 @@ class DBSA: "batch": data["batch"], "machine": data.get("machine_id", 0), "is_piracy": data.get("is_piracy", '3'), - "ts_status": 1, # 默认值 + "ts_status": 1, } vid_row = { "v_id": data["v_id"], "v_xid": data["v_xid"], + "rn": data["rn"], "v_name": data["v_name"], "title": data["title"], "link": data["link"], @@ -739,16 +740,19 @@ class DBSA: "watch_number": data.get("view", 0), "follow_number": data.get("fans", 0), "video_number": data.get("videos", 0), + "ts_status": 1, + "is_repeat": 3, } with cls._lock: cls._buf_op.append(op_row) cls._buf_vid.append(vid_row) cls._buf_payload.append(data) + logger.debug(f"加入操作记录: {op_row}") + logger.debug(f"加入视频记录: {vid_row}") - buf_len = len(cls._buf_vid) - if buf_len >= cls.FLUSH_EVERY_ROWS or time.time() - cls._last_flush >= cls.FLUSH_INTERVAL: - logger.info("DBSA 落盘 ...") + if len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS or time.time() - cls._last_flush >= cls.FLUSH_INTERVAL: + logger.info("DBSA 落盘 (ROWS/TIME) ...") cls.flush() @classmethod @@ -765,9 +769,12 @@ class DBSA: if not op_rows and not vid_rows: return - # 写入作者表 - authors_map = {} + logger.info("DBSA 开始 flush ...") + logger.debug(f"flush 操作记录数: {len(op_rows)}") + logger.debug(f"flush 视频记录数: {len(vid_rows)}") + now_ts = int(time.time()) + authors_map = {} for data in payloads: u_xid = data.get("u_xid") if not u_xid: @@ -786,18 +793,20 @@ class DBSA: } author_rows = list(authors_map.values()) + logger.debug(f"flush 作者记录数: {len(author_rows)}") + if author_rows: - stmt_author = mysql_insert(video_author).values(author_rows) - upd_author = { - "u_name": stmt_author.inserted.u_name, - "u_pic": stmt_author.inserted.u_pic, - "follow_number": stmt_author.inserted.follow_number, - "v_number": stmt_author.inserted.v_number, - "pv_number": stmt_author.inserted.pv_number, - "b_number": stmt_author.inserted.b_number, - "update_time": stmt_author.inserted.update_time, + stmt = mysql_insert(video_author).values(author_rows) + upd = { + "u_name": stmt.inserted.u_name, + "u_pic": stmt.inserted.u_pic, + "follow_number": stmt.inserted.follow_number, + "v_number": stmt.inserted.v_number, + "pv_number": stmt.inserted.pv_number, + "b_number": stmt.inserted.b_number, + "update_time": stmt.inserted.update_time, } - ondup = stmt_author.on_duplicate_key_update(**upd_author) + ondup = stmt.on_duplicate_key_update(**upd) try: cls._execute_with_deadlock_retry(ondup) except Exception as e: @@ -805,7 +814,6 @@ class DBSA: cls.push_record_many(payloads) return - # 写入操作记录表 try: stmt = video_op.insert().values(op_rows) with _engine.begin() as conn: @@ -816,7 +824,6 @@ class DBSA: cls.push_record_many(payloads) return - # 写入视频表(注意不包含 is_repeat / ts_status / rn) try: stmt = mysql_insert(video).values(vid_rows) upd = { @@ -832,39 +839,11 @@ class DBSA: ondup = stmt.on_duplicate_key_update(**upd) with _engine.begin() as conn: conn.execute(ondup) - logger.info( - f"[DBSA] flush 完成:authors={len(author_rows)} 条, " - f"op={len(op_rows)} 条, video={len(vid_rows)} 条" - ) + logger.info(f"[DBSA] flush 完成:authors={len(author_rows)} 条, op={len(op_rows)} 条, video={len(vid_rows)} 条") except Exception as e: logger.error(f"写视频表失败: {e}") cls.push_record_many(payloads) - @classmethod - def update_video_stats(cls, locator: dict, stats: dict) -> int: - v_xid = locator.get("v_xid") - if not v_xid: - raise ValueError("locator 必须包含 'v_xid'") - - params = dict(stats) - params["updatetime"] = int(time.time()) - valid_cols = set(video.c.keys()) - filtered_params = {k: v for k, v in params.items() if k in valid_cols} - - stmt = video.update().where(video.c.v_xid == v_xid).values(**filtered_params) - with _engine.begin() as conn: - result = conn.execute(stmt) - return result.rowcount - - @classmethod - def update_video_stats_async(cls, locator: dict, stats: dict) -> None: - thread = threading.Thread( - target=cls.update_video_stats, - args=(locator, stats), - daemon=True - ) - thread.start() - @classmethod def _execute_with_deadlock_retry(cls, statement): for attempt in range(3): @@ -877,4 +856,4 @@ class DBSA: if code == 1213 and attempt < 2: time.sleep(0.5 * (attempt + 1)) continue - raise + raise \ No newline at end of file