diff --git a/DB.py b/DB.py index 685b211..ce2f064 100644 --- a/DB.py +++ b/DB.py @@ -25,8 +25,7 @@ _engine = create_engine( ) _meta = MetaData() -video_op = Table( - "sh_dm_video_op_v2", _meta, +video_op = Table("sh_dm_video_op_v2", _meta, Column("v_id", BigInteger, primary_key=True), Column("v_xid", String(64)), Column("a_id", Integer), @@ -35,7 +34,7 @@ video_op = Table( Column("keyword", String(255)), Column("rn", String(8)), Column("history_status", String(32)), - # Column("is_repeat", Integer), + Column("is_repeat", Integer), Column("sort", Integer), Column("createtime", Integer), Column("updatetime", Integer), @@ -43,8 +42,7 @@ video_op = Table( Column("machine", Integer), ) -video = Table( - "sh_dm_video_v2", _meta, +video = Table("sh_dm_video_v2", _meta, Column("v_id", BigInteger, primary_key=True), Column("v_xid", String(64)), Column("rn", String(8)), @@ -66,6 +64,7 @@ video = Table( ) + def mysql_retry(max_retries: int = 3, base_delay: float = 2.0): """ 装饰器工厂:捕获 InterfaceError 后断线重连并重试, @@ -442,14 +441,16 @@ class DBSA: _last_flush: float = time.time() _lock = threading.Lock() - push_record = staticmethod(lambda row: print("[退回Redis]", row["v_xid"])) + push_record_many = staticmethod( + lambda rows: print("[退回Redis] cnt=", len(rows)) + ) @classmethod def upsert_video(cls, data): data = copy.deepcopy(data) data.setdefault("a_id", 0) data.setdefault("history_status", "") - data.setdefault("is_repeat", 3) + data.setdefault("is_repeat", 3) # 避免 KeyError data["sort"] = data.get("index", 0) now_ts = int(time.time()) @@ -471,12 +472,15 @@ class DBSA: "status": 1, "createtime": now_ts, "updatetime": now_ts, } - need_flush = False with cls._lock: cls._buf_op.append(op_row) cls._buf_vid.append(vid_row) + cls._buf_payload.append(data) # 保存原始 buf_len = len(cls._buf_vid) print(f"DB缓冲 -> xid={data['v_xid']}, level={data['level']}, buffer={buf_len}") + + need_flush = False + flush_reason = "" if buf_len >= cls.FLUSH_EVERY_ROWS: need_flush = True flush_reason = "ROWS" @@ -515,22 +519,31 @@ class DBSA: @classmethod def flush(cls): with cls._lock: - op_rows, vid_rows = cls._buf_op[:], cls._buf_vid[:] + op_rows = cls._buf_op[:] + vid_rows = cls._buf_vid[:] + payloads = cls._buf_payload[:] cls._buf_op.clear() cls._buf_vid.clear() + cls._buf_payload.clear() cls._last_flush = time.time() if not op_rows and not vid_rows: return + + for r in vid_rows: + r.pop("is_repeat", None) + r.pop("level", None) + start = time.time() try: cls._bulk_insert(op_rows) cls._bulk_upsert(vid_rows) - elapsed = time.time() - start - print(f"[DBSA] 成 op={len(op_rows)} video={len(vid_rows)} time={elapsed:.3f}s") + print(f"[DBSA] 成 op={len(op_rows)} video={len(vid_rows)} time={time.time() - start:.3f}s") + except Exception as e: print(f"[DBSA] flush FAIL: {e} op={len(op_rows)} video={len(vid_rows)}") - for row in vid_rows: - db = DBVidcon() - db.push_record(row) - db.close() + # 批量退回原始 payload,字段最全 + try: + cls.push_record_many(payloads) + except Exception as re: + print("[Redis 回退失败]", re)