feat: 优化 DB.py 代码,简化表定义并增强 upsert_video 方法的调试信息

This commit is contained in:
晓丰 2025-05-21 01:41:32 +08:00
parent 9a30c2f86d
commit 06503422ee

43
DB.py
View File

@ -25,8 +25,7 @@ _engine = create_engine(
) )
_meta = MetaData() _meta = MetaData()
video_op = Table( video_op = Table("sh_dm_video_op_v2", _meta,
"sh_dm_video_op_v2", _meta,
Column("v_id", BigInteger, primary_key=True), Column("v_id", BigInteger, primary_key=True),
Column("v_xid", String(64)), Column("v_xid", String(64)),
Column("a_id", Integer), Column("a_id", Integer),
@ -35,7 +34,7 @@ video_op = Table(
Column("keyword", String(255)), Column("keyword", String(255)),
Column("rn", String(8)), Column("rn", String(8)),
Column("history_status", String(32)), Column("history_status", String(32)),
# Column("is_repeat", Integer), Column("is_repeat", Integer),
Column("sort", Integer), Column("sort", Integer),
Column("createtime", Integer), Column("createtime", Integer),
Column("updatetime", Integer), Column("updatetime", Integer),
@ -43,8 +42,7 @@ video_op = Table(
Column("machine", Integer), Column("machine", Integer),
) )
video = Table( video = Table("sh_dm_video_v2", _meta,
"sh_dm_video_v2", _meta,
Column("v_id", BigInteger, primary_key=True), Column("v_id", BigInteger, primary_key=True),
Column("v_xid", String(64)), Column("v_xid", String(64)),
Column("rn", String(8)), Column("rn", String(8)),
@ -66,6 +64,7 @@ video = Table(
) )
def mysql_retry(max_retries: int = 3, base_delay: float = 2.0): def mysql_retry(max_retries: int = 3, base_delay: float = 2.0):
""" """
装饰器工厂捕获 InterfaceError 后断线重连并重试 装饰器工厂捕获 InterfaceError 后断线重连并重试
@ -442,14 +441,16 @@ class DBSA:
_last_flush: float = time.time() _last_flush: float = time.time()
_lock = threading.Lock() _lock = threading.Lock()
push_record = staticmethod(lambda row: print("[退回Redis]", row["v_xid"])) push_record_many = staticmethod(
lambda rows: print("[退回Redis] cnt=", len(rows))
)
@classmethod @classmethod
def upsert_video(cls, data): def upsert_video(cls, data):
data = copy.deepcopy(data) data = copy.deepcopy(data)
data.setdefault("a_id", 0) data.setdefault("a_id", 0)
data.setdefault("history_status", "") data.setdefault("history_status", "")
data.setdefault("is_repeat", 3) data.setdefault("is_repeat", 3) # 避免 KeyError
data["sort"] = data.get("index", 0) data["sort"] = data.get("index", 0)
now_ts = int(time.time()) now_ts = int(time.time())
@ -471,12 +472,15 @@ class DBSA:
"status": 1, "createtime": now_ts, "updatetime": now_ts, "status": 1, "createtime": now_ts, "updatetime": now_ts,
} }
need_flush = False
with cls._lock: with cls._lock:
cls._buf_op.append(op_row) cls._buf_op.append(op_row)
cls._buf_vid.append(vid_row) cls._buf_vid.append(vid_row)
cls._buf_payload.append(data) # 保存原始
buf_len = len(cls._buf_vid) buf_len = len(cls._buf_vid)
print(f"DB缓冲 -> xid={data['v_xid']}, level={data['level']}, buffer={buf_len}") print(f"DB缓冲 -> xid={data['v_xid']}, level={data['level']}, buffer={buf_len}")
need_flush = False
flush_reason = ""
if buf_len >= cls.FLUSH_EVERY_ROWS: if buf_len >= cls.FLUSH_EVERY_ROWS:
need_flush = True need_flush = True
flush_reason = "ROWS" flush_reason = "ROWS"
@ -515,22 +519,31 @@ class DBSA:
@classmethod @classmethod
def flush(cls): def flush(cls):
with cls._lock: with cls._lock:
op_rows, vid_rows = cls._buf_op[:], cls._buf_vid[:] op_rows = cls._buf_op[:]
vid_rows = cls._buf_vid[:]
payloads = cls._buf_payload[:]
cls._buf_op.clear() cls._buf_op.clear()
cls._buf_vid.clear() cls._buf_vid.clear()
cls._buf_payload.clear()
cls._last_flush = time.time() cls._last_flush = time.time()
if not op_rows and not vid_rows: if not op_rows and not vid_rows:
return return
for r in vid_rows:
r.pop("is_repeat", None)
r.pop("level", None)
start = time.time() start = time.time()
try: try:
cls._bulk_insert(op_rows) cls._bulk_insert(op_rows)
cls._bulk_upsert(vid_rows) cls._bulk_upsert(vid_rows)
elapsed = time.time() - start print(f"[DBSA] 成 op={len(op_rows)} video={len(vid_rows)} time={time.time() - start:.3f}s")
print(f"[DBSA] 成 op={len(op_rows)} video={len(vid_rows)} time={elapsed:.3f}s")
except Exception as e: except Exception as e:
print(f"[DBSA] flush FAIL: {e} op={len(op_rows)} video={len(vid_rows)}") print(f"[DBSA] flush FAIL: {e} op={len(op_rows)} video={len(vid_rows)}")
for row in vid_rows: # 批量退回原始 payload字段最全
db = DBVidcon() try:
db.push_record(row) cls.push_record_many(payloads)
db.close() except Exception as re:
print("[Redis 回退失败]", re)