feat: 通过可配置的参数和健壮的SQL执行处理增强了DB类

This commit is contained in:
晓丰 2025-07-04 20:55:22 +08:00
parent 82e65d86f8
commit 9d7a792937

219
DB.py
View File

@ -665,22 +665,33 @@ class DBVidcon:
return json.loads(item) if item is not None else None return json.loads(item) if item is not None else None
class DBSA:
FLUSH_EVERY_ROWS = 100
FLUSH_INTERVAL = 30
_buf_op = [] class DBSA:
_buf_vid = [] # ---------- 参数区,可按需调整 ----------
_buf_payload = [] FLUSH_EVERY_ROWS = 100 # 行数阈值
_last_flush = time.time() FLUSH_INTERVAL = 30 # 秒阈值
_lock = threading.Lock() MAX_SQL_RETRY = 3 # 单条 SQL 死锁自旋次数
_existing_op_keys = set() SQL_RETRY_BASE_SLEEP = 0.5 # SQL 第一次重试等待 0.5s,指数递增
FLUSH_RETRY = 3 # 整体 flush 最多尝试次数
DELAY_ON_FAIL = 10 # flush 失败等待秒数
DEADLOCK_ERRNO = 1213 # MySQL 死锁错误码
# -------------------------------------------
_buf_op = []
_buf_vid = []
_buf_payload = []
_last_flush = time.time()
_lock = threading.Lock()
_existing_op_keys = set()
_existing_vid_keys = set() _existing_vid_keys = set()
# ---------------- 对外接口 -----------------
@staticmethod @staticmethod
def push_record_many(rows): def push_record_many(rows):
"""失败时退回 Redis 的占位函数"""
logger.info(f"[退回Redis] cnt={len(rows)}") logger.info(f"[退回Redis] cnt={len(rows)}")
# ---------------- 数据写入入口 -------------
@classmethod @classmethod
def upsert_video(cls, data): def upsert_video(cls, data):
data = copy.deepcopy(data) data = copy.deepcopy(data)
@ -691,25 +702,19 @@ class DBSA:
now_ts = int(time.time()) now_ts = int(time.time())
op_index_key = ( op_index_key = (data["v_xid"] or "", data["keyword"] or "", now_ts)
data["v_xid"] or "", vid_index_key = (data["v_xid"] or "", data["title"] or "")
data["keyword"] or "",
now_ts
)
vid_index_key = (
data["v_xid"] or "",
data["title"] or ""
)
# ---------- 加锁写入缓冲 ----------
with cls._lock: with cls._lock:
if op_index_key in cls._existing_op_keys: if op_index_key in cls._existing_op_keys:
logger.debug(f"跳过重复操作记录: {op_index_key}") logger.debug(f"跳过重复操作记录: {op_index_key}")
return return
if vid_index_key in cls._existing_vid_keys: if vid_index_key in cls._existing_vid_keys:
logger.debug(f"跳过重复视频记录: {vid_index_key}") logger.debug(f"跳过重复视频记录: {vid_index_key}")
return return
# 组装 op_row、vid_row保持你原逻辑
op_row = { op_row = {
"v_id": data["v_id"], "v_id": data["v_id"],
"v_xid": data["v_xid"], "v_xid": data["v_xid"],
@ -750,39 +755,88 @@ class DBSA:
"watch_number": data.get("view", 0), "watch_number": data.get("view", 0),
"follow_number": data.get("fans", 0), "follow_number": data.get("fans", 0),
"video_number": data.get("videos", 0), "video_number": data.get("videos", 0),
# "ts_status": data.get("ts_status", 1),
} }
# 只保留 video 表中合法字段 # 只保留 video 表中合法字段
video_fields = {c.name for c in video.columns} video_fields = {c.name for c in video.columns}
vid_row = {k: v for k, v in vid_row.items() if k in video_fields} vid_row = {k: v for k, v in vid_row.items() if k in video_fields}
# 写入缓冲
cls._buf_op.append(op_row) cls._buf_op.append(op_row)
cls._buf_vid.append(vid_row) cls._buf_vid.append(vid_row)
cls._buf_payload.append(data) cls._buf_payload.append(data)
cls._existing_op_keys.add(op_index_key) cls._existing_op_keys.add(op_index_key)
cls._existing_vid_keys.add(vid_index_key) cls._existing_vid_keys.add(vid_index_key)
if len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS or time.time() - cls._last_flush >= cls.FLUSH_INTERVAL: # 判断是否触发 flush
if (len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS
or time.time() - cls._last_flush >= cls.FLUSH_INTERVAL):
logger.info("落表:达到行数或超时阈值,开始落库") logger.info("落表:达到行数或超时阈值,开始落库")
cls.flush() cls.flush()
# ------------- SQL 安全执行 ---------------
@classmethod
def _safe_execute(cls, statement, desc=""):
"""带死锁自旋重试"""
for attempt in range(cls.MAX_SQL_RETRY):
try:
with _engine.begin() as conn:
conn.execute(statement)
return
except Exception as e:
err_no = getattr(e.orig, "args", [None])[0]
if err_no == cls.DEADLOCK_ERRNO and attempt < cls.MAX_SQL_RETRY - 1:
sleep(cls.SQL_RETRY_BASE_SLEEP * (attempt + 1))
logger.warning("[%s] 死锁重试 %d/%d",
desc, attempt + 1, cls.MAX_SQL_RETRY)
continue
raise
# ------------- 外层 flush带整体重试 ---------------
@classmethod @classmethod
def flush(cls): def flush(cls):
for round_no in range(1, cls.FLUSH_RETRY + 1):
try:
cls._flush_once()
return # 成功即退出
except Exception as e:
logger.error("[flush] 第 %d 轮失败: %s", round_no, e)
if round_no < cls.FLUSH_RETRY:
sleep(cls.DELAY_ON_FAIL)
logger.info("[flush] 等待 %ds 后重试…", cls.DELAY_ON_FAIL)
else:
logger.error("[flush] 连续 %d 轮失败,退回 Redis", cls.FLUSH_RETRY)
cls.push_record_many(cls._buf_payload)
# 清空缓冲,避免死循环
cls._buf_op.clear()
cls._buf_vid.clear()
cls._buf_payload.clear()
cls._existing_op_keys.clear()
cls._existing_vid_keys.clear()
cls._last_flush = time.time()
return
# ------------- 真正写库动作 ---------------
@classmethod
def _flush_once(cls):
"""一次完整落库流程,任何异常让上层捕获"""
# --- 拷贝缓冲并清空 ---
with cls._lock: with cls._lock:
op_rows = cls._buf_op[:] op_rows = cls._buf_op[:]
vid_rows = cls._buf_vid[:] vid_rows = cls._buf_vid[:]
payloads = cls._buf_payload[:] payloads = cls._buf_payload[:]
cls._buf_op.clear() cls._buf_op.clear()
cls._buf_vid.clear() cls._buf_vid.clear()
cls._buf_payload.clear() cls._buf_payload.clear()
cls._last_flush = time.time()
cls._existing_op_keys.clear() cls._existing_op_keys.clear()
cls._existing_vid_keys.clear() cls._existing_vid_keys.clear()
cls._last_flush = time.time()
if not op_rows and not vid_rows: if not op_rows and not vid_rows:
return return
# --- 写作者表 -------------------------------------------------
authors_map = {} authors_map = {}
now_ts = int(time.time()) now_ts = int(time.time())
for data in payloads: for data in payloads:
@ -802,83 +856,54 @@ class DBSA:
"update_time": now_ts "update_time": now_ts
} }
author_rows = list(authors_map.values()) if authors_map:
if author_rows: stmt_author = mysql_insert(video_author).values(list(authors_map.values()))
stmt_author = mysql_insert(video_author).values(author_rows)
upd_author = { upd_author = {
"u_name": stmt_author.inserted.u_name, "u_name": stmt_author.inserted.u_name,
"u_pic": stmt_author.inserted.u_pic, "u_pic": stmt_author.inserted.u_pic,
"follow_number": stmt_author.inserted.follow_number, "follow_number": stmt_author.inserted.follow_number,
"v_number": stmt_author.inserted.v_number, "v_number": stmt_author.inserted.v_number,
"pv_number": stmt_author.inserted.pv_number, "pv_number": stmt_author.inserted.pv_number,
"b_number": stmt_author.inserted.b_number, "b_number": stmt_author.inserted.b_number,
"update_time": stmt_author.inserted.update_time, "update_time": stmt_author.inserted.update_time,
} }
ondup_author = stmt_author.on_duplicate_key_update(**upd_author) ondup_author = stmt_author.on_duplicate_key_update(**upd_author)
try: cls._safe_execute(ondup_author, desc="video_author")
cls._execute_with_deadlock_retry(ondup_author)
except Exception as e:
logger.error(f"写作者表失败: {e}")
cls.push_record_many(payloads)
return
# --- 写 video_op ----------------------------------------------
if op_rows: if op_rows:
try: stmt_op = mysql_insert(video_op).values(op_rows)
stmt = mysql_insert(video_op).values(op_rows) ondup_op = stmt_op.on_duplicate_key_update(
ondup = stmt.on_duplicate_key_update( updatetime=stmt_op.inserted.updatetime,
updatetime=stmt.inserted.updatetime, operatetime=stmt_op.inserted.operatetime,
operatetime=stmt.inserted.operatetime, ts_status=stmt_op.inserted.ts_status,
ts_status=stmt.inserted.ts_status, is_repeat=stmt_op.inserted.is_repeat,
is_repeat=stmt.inserted.is_repeat, )
) cls._safe_execute(ondup_op, desc="video_op")
with _engine.begin() as conn: logger.info("落表:操作记录 %d", len(op_rows))
conn.execute(ondup)
logger.info(f"落表:操作记录 {len(op_rows)}")
except Exception as e:
logger.error(f"写操作记录表失败: {e}")
cls.push_record_many(payloads)
return
# --- 写 video --------------------------------------------------
if vid_rows: if vid_rows:
try: stmt_vid = mysql_insert(video).values(vid_rows)
stmt = mysql_insert(video).values(vid_rows) upd = {
upd = { "title": stmt_vid.inserted.title,
"title": stmt.inserted.title, "link": stmt_vid.inserted.link,
"link": stmt.inserted.link, "edition": stmt_vid.inserted.edition,
"edition": stmt.inserted.edition, "duration": stmt_vid.inserted.duration,
"duration": stmt.inserted.duration, "watch_number": stmt_vid.inserted.watch_number,
"watch_number": stmt.inserted.watch_number, "follow_number": stmt_vid.inserted.follow_number,
"follow_number": stmt.inserted.follow_number, "video_number": stmt_vid.inserted.video_number,
"video_number": stmt.inserted.video_number, "public_time": stmt_vid.inserted.public_time,
"public_time": stmt.inserted.public_time, "cover_pic": stmt_vid.inserted.cover_pic,
"cover_pic": stmt.inserted.cover_pic, "sort": stmt_vid.inserted.sort,
"sort": stmt.inserted.sort, "u_xid": stmt_vid.inserted.u_xid,
"u_xid": stmt.inserted.u_xid, "u_id": stmt_vid.inserted.u_id,
"u_id": stmt.inserted.u_id, "u_pic": stmt_vid.inserted.u_pic,
"u_pic": stmt.inserted.u_pic, "u_name": stmt_vid.inserted.u_name,
"u_name": stmt.inserted.u_name, "status": stmt_vid.inserted.status,
"status": stmt.inserted.status, "updatetime": stmt_vid.inserted.updatetime,
# "ts_status": stmt.inserted.ts_status, "operatetime": stmt_vid.inserted.operatetime,
"updatetime": stmt.inserted.updatetime, }
"operatetime": stmt.inserted.operatetime, ondup_vid = stmt_vid.on_duplicate_key_update(**upd)
} cls._safe_execute(ondup_vid, desc="video")
ondup = stmt.on_duplicate_key_update(**upd) logger.info("落表:视频记录 %d", len(vid_rows))
with _engine.begin() as conn:
conn.execute(ondup)
logger.info(f"落表:视频记录 {len(vid_rows)}")
except Exception as e:
logger.error(f"写视频表失败: {e}")
cls.push_record_many(payloads)
@classmethod
def _execute_with_deadlock_retry(cls, statement):
for attempt in range(3):
try:
with _engine.begin() as conn:
conn.execute(statement)
return
except Exception as e:
if getattr(e.orig, "args", [None])[0] == 1213 and attempt < 2:
time.sleep(0.5 * (attempt + 1))
continue
raise