diff --git a/DB.py b/DB.py index 619f78a..a171338 100644 --- a/DB.py +++ b/DB.py @@ -26,7 +26,7 @@ _engine = create_engine( future=True, ) _meta = MetaData() - +# 操作记录表 video_op = Table("sh_dm_video_op_v3", _meta, Column("id", Integer, primary_key=True, autoincrement=True), Column("v_id", String(64)), @@ -53,7 +53,7 @@ video = Table("sh_dm_video_v3", _meta, Column("id", Integer, primary_key=True, autoincrement=True), Column("v_id", String(64)), Column("v_xid", String(64)), - Column("rn", String(50)), # 关键修复:添加rn列 + Column("rn", String(50)), Column("v_name", String(255), nullable=False), Column("title", String(255), nullable=False), Column("link", String(255), nullable=False), @@ -78,7 +78,6 @@ video = Table("sh_dm_video_v3", _meta, Column("is_repeat", Integer, default=0), Column("operatetime", Integer), ) - # 作者表 video_author = Table( "sh_dm_video_author", @@ -674,8 +673,8 @@ class DBVidcon: class DBSA: - FLUSH_EVERY_ROWS = 100 # 行阈值 - FLUSH_INTERVAL = 30 # 秒阈值 + FLUSH_EVERY_ROWS = 100 + FLUSH_INTERVAL = 30 _buf_op: list = [] _buf_vid: list = [] @@ -685,7 +684,6 @@ class DBSA: @staticmethod def push_record_many(rows): - """退回Redis的模拟方法""" logger.info(f"[退回Redis] cnt={len(rows)}") @classmethod @@ -693,7 +691,6 @@ class DBSA: data = copy.deepcopy(data) data.setdefault("a_id", 0) data.setdefault("history_status", "") - data.setdefault("is_repeat", 3) data.setdefault("keyword", "") data["sort"] = data.get("index", 0) @@ -716,7 +713,7 @@ class DBSA: "batch": data["batch"], "machine": data.get("machine_id", 0), "is_piracy": data.get("is_piracy", '3'), - "ts_status": data.get("ts_status", 1), + "ts_status": 1, # 默认值 } vid_row = { @@ -730,6 +727,7 @@ class DBSA: "public_time": data["create_time"], "cover_pic": data["cover_pic"], "sort": data["sort"], + "history_status": data.get("history_status", ""), "u_xid": data["u_xid"], "u_id": data["u_id"], "u_pic": data["u_pic"], @@ -741,9 +739,6 @@ class DBSA: "watch_number": data.get("view", 0), "follow_number": data.get("fans", 0), "video_number": data.get("videos", 0), - "is_repeat": 3, - "is_piracy": data.get("is_piracy", 3), - "ts_status": data.get("ts_status", 1), } with cls._lock: @@ -752,19 +747,9 @@ class DBSA: cls._buf_payload.append(data) buf_len = len(cls._buf_vid) - need_flush = False - flush_reason = "" - - if buf_len >= cls.FLUSH_EVERY_ROWS: - need_flush = True - flush_reason = "ROWS" - elif time.time() - cls._last_flush >= cls.FLUSH_INTERVAL: - need_flush = True - flush_reason = "TIME" - - if need_flush: - logger.info(f"DBSA 落盘 ({flush_reason}) ...") - cls.flush() + if buf_len >= cls.FLUSH_EVERY_ROWS or time.time() - cls._last_flush >= cls.FLUSH_INTERVAL: + logger.info("DBSA 落盘 ...") + cls.flush() @classmethod def flush(cls): @@ -780,13 +765,7 @@ class DBSA: if not op_rows and not vid_rows: return - # 所有 is_repeat 统一为 3 - for i, vid_row in enumerate(vid_rows): - vid_row["is_repeat"] = 3 - if i < len(op_rows): - op_rows[i]["is_repeat"] = 3 - - # 处理作者表 + # 写入作者表 authors_map = {} now_ts = int(time.time()) for data in payloads: @@ -808,7 +787,6 @@ class DBSA: author_rows = list(authors_map.values()) if author_rows: - author_rows.sort(key=lambda x: x["u_xid"]) stmt_author = mysql_insert(video_author).values(author_rows) upd_author = { "u_name": stmt_author.inserted.u_name, @@ -819,39 +797,26 @@ class DBSA: "b_number": stmt_author.inserted.b_number, "update_time": stmt_author.inserted.update_time, } - ondup_author = stmt_author.on_duplicate_key_update(**upd_author) + ondup = stmt_author.on_duplicate_key_update(**upd_author) try: - cls._execute_with_deadlock_retry(ondup_author) + cls._execute_with_deadlock_retry(ondup) except Exception as e: logger.error(f"写作者表失败: {e}") - try: - cls.push_record_many(payloads) - except Exception as re: - logger.error("[Redis 回退失败]", re) + cls.push_record_many(payloads) return - # 插入操作记录表 + # 写入操作记录表 try: - op_rows.sort(key=lambda x: ( - x['rn'] or '', - x['name_title'] or '', - x['keyword'] or '', - x['v_xid'] or '', - x['createtime'] - )) stmt = video_op.insert().values(op_rows) with _engine.begin() as conn: conn.execute(stmt) logger.info(f"插入操作记录: {len(op_rows)} 条") except Exception as e: logger.error(f"插入操作记录失败: {e}") - try: - cls.push_record_many(payloads) - except Exception as re: - logger.error("[Redis 回退失败]", re) + cls.push_record_many(payloads) return - # 插入/更新视频表 + # 写入视频表(注意不包含 is_repeat / ts_status / rn) try: stmt = mysql_insert(video).values(vid_rows) upd = { @@ -863,9 +828,6 @@ class DBSA: "watch_number": stmt.inserted.watch_number, "follow_number": stmt.inserted.follow_number, "video_number": stmt.inserted.video_number, - "is_repeat": stmt.inserted.is_repeat, - "is_piracy": stmt.inserted.is_piracy, - "ts_status": stmt.inserted.ts_status, } ondup = stmt.on_duplicate_key_update(**upd) with _engine.begin() as conn: @@ -876,29 +838,20 @@ class DBSA: ) except Exception as e: logger.error(f"写视频表失败: {e}") - try: - cls.push_record_many(payloads) - except Exception as re: - logger.error("[Redis 回退失败]", re) + cls.push_record_many(payloads) @classmethod def update_video_stats(cls, locator: dict, stats: dict) -> int: v_xid = locator.get("v_xid") - rn = locator.get("rn") - if not v_xid or not rn: - raise ValueError("locator 必须包含 'v_xid' 和 'rn'") + if not v_xid: + raise ValueError("locator 必须包含 'v_xid'") params = dict(stats) params["updatetime"] = int(time.time()) valid_cols = set(video.c.keys()) filtered_params = {k: v for k, v in params.items() if k in valid_cols} - stmt = ( - video - .update() - .where(video.c.v_xid == v_xid, video.c.rn == rn) - .values(**filtered_params) - ) + stmt = video.update().where(video.c.v_xid == v_xid).values(**filtered_params) with _engine.begin() as conn: result = conn.execute(stmt) return result.rowcount @@ -924,4 +877,4 @@ class DBSA: if code == 1213 and attempt < 2: time.sleep(0.5 * (attempt + 1)) continue - raise \ No newline at end of file + raise