feat: 优化DB.py中的视频和操作记录的UPSERT逻辑,添加重复记录检查和日志记录

This commit is contained in:
晓丰 2025-07-01 19:18:43 +08:00
parent f4c632a46d
commit ab40a4ea3b

256
DB.py
View File

@ -671,6 +671,13 @@ class DBVidcon:
# 如果你存入的是 JSON 字符串,可以在这里做一次反序列化: # 如果你存入的是 JSON 字符串,可以在这里做一次反序列化:
return json.loads(item) if item is not None else None return json.loads(item) if item is not None else None
import time
import threading
import copy
from datetime import datetime
from sqlalchemy.dialects.mysql import insert as mysql_insert
from app import _engine, logger
from app.models import video, video_op, video_author
class DBSA: class DBSA:
FLUSH_EVERY_ROWS = 100 FLUSH_EVERY_ROWS = 100
@ -681,6 +688,8 @@ class DBSA:
_buf_payload = [] _buf_payload = []
_last_flush = time.time() _last_flush = time.time()
_lock = threading.Lock() _lock = threading.Lock()
_existing_op_keys = set()
_existing_vid_keys = set()
@staticmethod @staticmethod
def push_record_many(rows): def push_record_many(rows):
@ -690,66 +699,85 @@ class DBSA:
def upsert_video(cls, data): def upsert_video(cls, data):
data = copy.deepcopy(data) data = copy.deepcopy(data)
data.setdefault("a_id", 0) data.setdefault("a_id", 0)
data.setdefault("history_status", "") data.setdefault("is_repeat", 3)
data.setdefault("keyword", "") data.setdefault("keyword", "")
data["sort"] = data.get("index", 0) data["sort"] = data.get("index", 0)
now_ts = int(time.time()) now_ts = int(time.time())
op_row = { op_index_key = (
"v_id": data["v_id"], data["v_xid"] or "",
"v_xid": data["v_xid"], data["keyword"] or "",
"a_id": data["a_id"], now_ts
"level": data["level"], )
"name_title": data["v_name"], vid_index_key = (
"keyword": data["keyword"], data["v_xid"] or "",
"rn": data["rn"], data["title"] or ""
"history_status": data["history_status"], )
"is_repeat": 3,
"sort": data["sort"],
"createtime": now_ts,
"updatetime": now_ts,
"operatetime": now_ts,
"batch": data["batch"],
"machine": data.get("machine_id", 0),
"is_piracy": data.get("is_piracy", '3'),
"ts_status": 1,
}
vid_row = {
"v_id": data["v_id"],
"v_xid": data["v_xid"],
"v_name": data["v_name"],
"title": data["title"],
"link": data["link"],
"edition": "",
"duration": str(data["duration"]) if data.get("duration") else '0',
"public_time": data["create_time"],
"cover_pic": data["cover_pic"],
"sort": data["sort"],
"history_status": data.get("history_status", ""),
"u_xid": data["u_xid"],
"u_id": data["u_id"],
"u_pic": data["u_pic"],
"u_name": data["u_name"],
"status": 1,
"createtime": now_ts,
"updatetime": now_ts,
"operatetime": now_ts,
"watch_number": data.get("view", 0),
"follow_number": data.get("fans", 0),
"video_number": data.get("videos", 0),
"ts_status": 1,
"is_repeat": 3,
}
with cls._lock: with cls._lock:
if op_index_key in cls._existing_op_keys:
logger.debug(f"跳过重复操作记录: {op_index_key}")
return
if vid_index_key in cls._existing_vid_keys:
logger.debug(f"跳过重复视频记录: {vid_index_key}")
return
# 操作记录表
op_row = {
"v_id": data["v_id"],
"v_xid": data["v_xid"],
"a_id": data["a_id"],
"level": data["level"],
"name_title": data["title"],
"keyword": data["keyword"],
"is_repeat": data["is_repeat"],
"sort": data["sort"],
"createtime": now_ts,
"updatetime": now_ts,
"operatetime": now_ts,
"batch": data["batch"],
"machine": data.get("machine_id", 0),
"is_piracy": data.get("is_piracy", '3'),
"ts_status": data.get("ts_status", 1),
}
# 视频表
vid_row = {
"v_id": data["v_id"],
"v_xid": data["v_xid"],
"title": data["title"],
"link": data["link"],
"edition": "",
"duration": str(data["duration"]) if data.get("duration") else '0',
"public_time": data["create_time"],
"cover_pic": data["cover_pic"],
"sort": data["sort"],
"u_xid": data["u_xid"],
"u_id": data["u_id"],
"u_pic": data["u_pic"],
"u_name": data["u_name"],
"status": 1,
"createtime": now_ts,
"updatetime": now_ts,
"operatetime": now_ts,
"watch_number": data.get("view", 0),
"follow_number": data.get("fans", 0),
"video_number": data.get("videos", 0),
"is_repeat": 3,
"is_piracy": data.get("is_piracy", 3),
"ts_status": data.get("ts_status", 1),
}
cls._buf_op.append(op_row) cls._buf_op.append(op_row)
cls._buf_vid.append(vid_row) cls._buf_vid.append(vid_row)
cls._buf_payload.append(data) cls._buf_payload.append(data)
cls._existing_op_keys.add(op_index_key)
cls._existing_vid_keys.add(vid_index_key)
if len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS or time.time() - cls._last_flush >= cls.FLUSH_INTERVAL: if len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS or time.time() - cls._last_flush >= cls.FLUSH_INTERVAL:
logger.info("DBSA 落盘 (ROWS/TIME) ...") logger.info("落表:达到行数或超时阈值,开始落库")
cls.flush() cls.flush()
@classmethod @classmethod
@ -762,17 +790,15 @@ class DBSA:
cls._buf_vid.clear() cls._buf_vid.clear()
cls._buf_payload.clear() cls._buf_payload.clear()
cls._last_flush = time.time() cls._last_flush = time.time()
cls._existing_op_keys.clear()
cls._existing_vid_keys.clear()
if not op_rows and not vid_rows: if not op_rows and not vid_rows:
return return
logger.info("DBSA 开始 flush ...") # ====== 写作者表 ======
logger.debug(f"flush 操作记录数: {len(op_rows)}")
logger.debug(f"flush 视频记录数: {len(vid_rows)}")
# 作者表处理
now_ts = int(time.time())
authors_map = {} authors_map = {}
now_ts = int(time.time())
for data in payloads: for data in payloads:
u_xid = data.get("u_xid") u_xid = data.get("u_xid")
if not u_xid: if not u_xid:
@ -791,77 +817,76 @@ class DBSA:
} }
author_rows = list(authors_map.values()) author_rows = list(authors_map.values())
logger.debug(f"flush 作者记录数: {len(author_rows)}")
if author_rows: if author_rows:
stmt = mysql_insert(video_author).values(author_rows) stmt_author = mysql_insert(video_author).values(author_rows)
upd = { upd_author = {
"u_name": stmt.inserted.u_name, "u_name": stmt_author.inserted.u_name,
"u_pic": stmt.inserted.u_pic, "u_pic": stmt_author.inserted.u_pic,
"follow_number": stmt.inserted.follow_number, "follow_number": stmt_author.inserted.follow_number,
"v_number": stmt.inserted.v_number, "v_number": stmt_author.inserted.v_number,
"pv_number": stmt.inserted.pv_number, "pv_number": stmt_author.inserted.pv_number,
"b_number": stmt.inserted.b_number, "b_number": stmt_author.inserted.b_number,
"update_time": stmt.inserted.update_time, "update_time": stmt_author.inserted.update_time,
} }
ondup = stmt.on_duplicate_key_update(**upd) ondup_author = stmt_author.on_duplicate_key_update(**upd_author)
try: try:
cls._execute_with_deadlock_retry(ondup) cls._execute_with_deadlock_retry(ondup_author)
except Exception as e: except Exception as e:
logger.error(f"写作者表失败: {e}") logger.error(f"写作者表失败: {e}")
try: cls.push_record_many(payloads)
cls.push_record_many(payloads)
except Exception as re:
logger.error("[Redis 回退失败]", re)
return return
# 操作记录 UPSERT解决唯一索引冲突 # ====== 写操作记录表UPSERT======
try: if op_rows:
stmt = mysql_insert(video_op).values(op_rows)
ondup = stmt.on_duplicate_key_update(
updatetime=stmt.inserted.updatetime,
operatetime=stmt.inserted.operatetime,
ts_status=stmt.inserted.ts_status,
is_repeat=stmt.inserted.is_repeat,
history_status=stmt.inserted.history_status,
)
with _engine.begin() as conn:
conn.execute(ondup)
logger.info(f"插入/更新操作记录: {len(op_rows)}")
except Exception as e:
logger.error(f"操作记录 UPSERT 失败: {e}")
try: try:
stmt = mysql_insert(video_op).values(op_rows)
ondup = stmt.on_duplicate_key_update(
updatetime=stmt.inserted.updatetime,
operatetime=stmt.inserted.operatetime,
ts_status=stmt.inserted.ts_status,
is_repeat=stmt.inserted.is_repeat,
)
with _engine.begin() as conn:
conn.execute(ondup)
logger.info(f"落表:操作记录 {len(op_rows)}")
except Exception as e:
logger.error(f"写操作记录表失败: {e}")
cls.push_record_many(payloads) cls.push_record_many(payloads)
except Exception as re: return
logger.error("[Redis 回退失败]", re)
return
# 视频表 UPSERT # ====== 写视频表UPSERT======
try: if vid_rows:
stmt = mysql_insert(video).values(vid_rows)
upd = {
"title": stmt.inserted.title,
"duration": stmt.inserted.duration,
"cover_pic": stmt.inserted.cover_pic,
"sort": stmt.inserted.sort,
"updatetime": stmt.inserted.updatetime,
"watch_number": stmt.inserted.watch_number,
"follow_number": stmt.inserted.follow_number,
"video_number": stmt.inserted.video_number,
"is_repeat": stmt.inserted.is_repeat,
"ts_status": stmt.inserted.ts_status,
}
ondup = stmt.on_duplicate_key_update(**upd)
with _engine.begin() as conn:
conn.execute(ondup)
logger.info(
f"[DBSA] flush 完成authors={len(author_rows)} 条, op={len(op_rows)} 条, video={len(vid_rows)}")
except Exception as e:
logger.error(f"写视频表失败: {e}")
try: try:
stmt = mysql_insert(video).values(vid_rows)
upd = {
"title": stmt.inserted.title,
"link": stmt.inserted.link,
"is_piracy": stmt.inserted.is_piracy,
"edition": stmt.inserted.edition,
"duration": stmt.inserted.duration,
"watch_number": stmt.inserted.watch_number,
"follow_number": stmt.inserted.follow_number,
"video_number": stmt.inserted.video_number,
"public_time": stmt.inserted.public_time,
"cover_pic": stmt.inserted.cover_pic,
"sort": stmt.inserted.sort,
"u_xid": stmt.inserted.u_xid,
"u_id": stmt.inserted.u_id,
"u_pic": stmt.inserted.u_pic,
"u_name": stmt.inserted.u_name,
"status": stmt.inserted.status,
"ts_status": stmt.inserted.ts_status,
"updatetime": stmt.inserted.updatetime,
"operatetime": stmt.inserted.operatetime,
"is_repeat": stmt.inserted.is_repeat,
}
ondup = stmt.on_duplicate_key_update(**upd)
with _engine.begin() as conn:
conn.execute(ondup)
logger.info(f"落表:视频记录 {len(vid_rows)}")
except Exception as e:
logger.error(f"写视频表失败: {e}")
cls.push_record_many(payloads) cls.push_record_many(payloads)
except Exception as re:
logger.error("[Redis 回退失败]", re)
@classmethod @classmethod
def _execute_with_deadlock_retry(cls, statement): def _execute_with_deadlock_retry(cls, statement):
@ -870,9 +895,8 @@ class DBSA:
with _engine.begin() as conn: with _engine.begin() as conn:
conn.execute(statement) conn.execute(statement)
return return
except OperationalError as e: except Exception as e:
code = getattr(e.orig, "args", [None])[0] if getattr(e.orig, "args", [None])[0] == 1213 and attempt < 2:
if code == 1213 and attempt < 2:
time.sleep(0.5 * (attempt + 1)) time.sleep(0.5 * (attempt + 1))
continue continue
raise raise