feat: 优化DB.py中的缓冲区逻辑,添加调试日志并移除冗余代码

This commit is contained in:
晓丰 2025-06-30 22:25:14 +08:00
parent 50dae2f8a6
commit e8ad57ff05

83
DB.py
View File

@ -676,10 +676,10 @@ class DBSA:
FLUSH_EVERY_ROWS = 100
FLUSH_INTERVAL = 30
_buf_op: list = []
_buf_vid: list = []
_buf_payload: list = []
_last_flush: float = time.time()
_buf_op = []
_buf_vid = []
_buf_payload = []
_last_flush = time.time()
_lock = threading.Lock()
@staticmethod
@ -713,12 +713,13 @@ class DBSA:
"batch": data["batch"],
"machine": data.get("machine_id", 0),
"is_piracy": data.get("is_piracy", '3'),
"ts_status": 1, # 默认值
"ts_status": 1,
}
vid_row = {
"v_id": data["v_id"],
"v_xid": data["v_xid"],
"rn": data["rn"],
"v_name": data["v_name"],
"title": data["title"],
"link": data["link"],
@ -739,16 +740,19 @@ class DBSA:
"watch_number": data.get("view", 0),
"follow_number": data.get("fans", 0),
"video_number": data.get("videos", 0),
"ts_status": 1,
"is_repeat": 3,
}
with cls._lock:
cls._buf_op.append(op_row)
cls._buf_vid.append(vid_row)
cls._buf_payload.append(data)
logger.debug(f"加入操作记录: {op_row}")
logger.debug(f"加入视频记录: {vid_row}")
buf_len = len(cls._buf_vid)
if buf_len >= cls.FLUSH_EVERY_ROWS or time.time() - cls._last_flush >= cls.FLUSH_INTERVAL:
logger.info("DBSA 落盘 ...")
if len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS or time.time() - cls._last_flush >= cls.FLUSH_INTERVAL:
logger.info("DBSA 落盘 (ROWS/TIME) ...")
cls.flush()
@classmethod
@ -765,9 +769,12 @@ class DBSA:
if not op_rows and not vid_rows:
return
# 写入作者表
authors_map = {}
logger.info("DBSA 开始 flush ...")
logger.debug(f"flush 操作记录数: {len(op_rows)}")
logger.debug(f"flush 视频记录数: {len(vid_rows)}")
now_ts = int(time.time())
authors_map = {}
for data in payloads:
u_xid = data.get("u_xid")
if not u_xid:
@ -786,18 +793,20 @@ class DBSA:
}
author_rows = list(authors_map.values())
logger.debug(f"flush 作者记录数: {len(author_rows)}")
if author_rows:
stmt_author = mysql_insert(video_author).values(author_rows)
upd_author = {
"u_name": stmt_author.inserted.u_name,
"u_pic": stmt_author.inserted.u_pic,
"follow_number": stmt_author.inserted.follow_number,
"v_number": stmt_author.inserted.v_number,
"pv_number": stmt_author.inserted.pv_number,
"b_number": stmt_author.inserted.b_number,
"update_time": stmt_author.inserted.update_time,
stmt = mysql_insert(video_author).values(author_rows)
upd = {
"u_name": stmt.inserted.u_name,
"u_pic": stmt.inserted.u_pic,
"follow_number": stmt.inserted.follow_number,
"v_number": stmt.inserted.v_number,
"pv_number": stmt.inserted.pv_number,
"b_number": stmt.inserted.b_number,
"update_time": stmt.inserted.update_time,
}
ondup = stmt_author.on_duplicate_key_update(**upd_author)
ondup = stmt.on_duplicate_key_update(**upd)
try:
cls._execute_with_deadlock_retry(ondup)
except Exception as e:
@ -805,7 +814,6 @@ class DBSA:
cls.push_record_many(payloads)
return
# 写入操作记录表
try:
stmt = video_op.insert().values(op_rows)
with _engine.begin() as conn:
@ -816,7 +824,6 @@ class DBSA:
cls.push_record_many(payloads)
return
# 写入视频表(注意不包含 is_repeat / ts_status / rn
try:
stmt = mysql_insert(video).values(vid_rows)
upd = {
@ -832,39 +839,11 @@ class DBSA:
ondup = stmt.on_duplicate_key_update(**upd)
with _engine.begin() as conn:
conn.execute(ondup)
logger.info(
f"[DBSA] flush 完成authors={len(author_rows)} 条, "
f"op={len(op_rows)} 条, video={len(vid_rows)}"
)
logger.info(f"[DBSA] flush 完成authors={len(author_rows)} 条, op={len(op_rows)} 条, video={len(vid_rows)}")
except Exception as e:
logger.error(f"写视频表失败: {e}")
cls.push_record_many(payloads)
@classmethod
def update_video_stats(cls, locator: dict, stats: dict) -> int:
v_xid = locator.get("v_xid")
if not v_xid:
raise ValueError("locator 必须包含 'v_xid'")
params = dict(stats)
params["updatetime"] = int(time.time())
valid_cols = set(video.c.keys())
filtered_params = {k: v for k, v in params.items() if k in valid_cols}
stmt = video.update().where(video.c.v_xid == v_xid).values(**filtered_params)
with _engine.begin() as conn:
result = conn.execute(stmt)
return result.rowcount
@classmethod
def update_video_stats_async(cls, locator: dict, stats: dict) -> None:
thread = threading.Thread(
target=cls.update_video_stats,
args=(locator, stats),
daemon=True
)
thread.start()
@classmethod
def _execute_with_deadlock_retry(cls, statement):
for attempt in range(3):
@ -877,4 +856,4 @@ class DBSA:
if code == 1213 and attempt < 2:
time.sleep(0.5 * (attempt + 1))
continue
raise
raise