feat: 优化视频表和操作记录表的写入逻辑,移除冗余字段

This commit is contained in:
晓丰 2025-06-30 22:10:34 +08:00
parent 3e5ec774ca
commit 50dae2f8a6

79
DB.py
View File

@ -26,7 +26,7 @@ _engine = create_engine(
future=True,
)
_meta = MetaData()
# 操作记录表
video_op = Table("sh_dm_video_op_v3", _meta,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("v_id", String(64)),
@ -53,7 +53,7 @@ video = Table("sh_dm_video_v3", _meta,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("v_id", String(64)),
Column("v_xid", String(64)),
Column("rn", String(50)), # 关键修复添加rn列
Column("rn", String(50)),
Column("v_name", String(255), nullable=False),
Column("title", String(255), nullable=False),
Column("link", String(255), nullable=False),
@ -78,7 +78,6 @@ video = Table("sh_dm_video_v3", _meta,
Column("is_repeat", Integer, default=0),
Column("operatetime", Integer),
)
# 作者表
video_author = Table(
"sh_dm_video_author",
@ -674,8 +673,8 @@ class DBVidcon:
class DBSA:
FLUSH_EVERY_ROWS = 100 # 行阈值
FLUSH_INTERVAL = 30 # 秒阈值
FLUSH_EVERY_ROWS = 100
FLUSH_INTERVAL = 30
_buf_op: list = []
_buf_vid: list = []
@ -685,7 +684,6 @@ class DBSA:
@staticmethod
def push_record_many(rows):
"""退回Redis的模拟方法"""
logger.info(f"[退回Redis] cnt={len(rows)}")
@classmethod
@ -693,7 +691,6 @@ class DBSA:
data = copy.deepcopy(data)
data.setdefault("a_id", 0)
data.setdefault("history_status", "")
data.setdefault("is_repeat", 3)
data.setdefault("keyword", "")
data["sort"] = data.get("index", 0)
@ -716,7 +713,7 @@ class DBSA:
"batch": data["batch"],
"machine": data.get("machine_id", 0),
"is_piracy": data.get("is_piracy", '3'),
"ts_status": data.get("ts_status", 1),
"ts_status": 1, # 默认值
}
vid_row = {
@ -730,6 +727,7 @@ class DBSA:
"public_time": data["create_time"],
"cover_pic": data["cover_pic"],
"sort": data["sort"],
"history_status": data.get("history_status", ""),
"u_xid": data["u_xid"],
"u_id": data["u_id"],
"u_pic": data["u_pic"],
@ -741,9 +739,6 @@ class DBSA:
"watch_number": data.get("view", 0),
"follow_number": data.get("fans", 0),
"video_number": data.get("videos", 0),
"is_repeat": 3,
"is_piracy": data.get("is_piracy", 3),
"ts_status": data.get("ts_status", 1),
}
with cls._lock:
@ -752,18 +747,8 @@ class DBSA:
cls._buf_payload.append(data)
buf_len = len(cls._buf_vid)
need_flush = False
flush_reason = ""
if buf_len >= cls.FLUSH_EVERY_ROWS:
need_flush = True
flush_reason = "ROWS"
elif time.time() - cls._last_flush >= cls.FLUSH_INTERVAL:
need_flush = True
flush_reason = "TIME"
if need_flush:
logger.info(f"DBSA 落盘 ({flush_reason}) ...")
if buf_len >= cls.FLUSH_EVERY_ROWS or time.time() - cls._last_flush >= cls.FLUSH_INTERVAL:
logger.info("DBSA 落盘 ...")
cls.flush()
@classmethod
@ -780,13 +765,7 @@ class DBSA:
if not op_rows and not vid_rows:
return
# 所有 is_repeat 统一为 3
for i, vid_row in enumerate(vid_rows):
vid_row["is_repeat"] = 3
if i < len(op_rows):
op_rows[i]["is_repeat"] = 3
# 处理作者表
# 写入作者表
authors_map = {}
now_ts = int(time.time())
for data in payloads:
@ -808,7 +787,6 @@ class DBSA:
author_rows = list(authors_map.values())
if author_rows:
author_rows.sort(key=lambda x: x["u_xid"])
stmt_author = mysql_insert(video_author).values(author_rows)
upd_author = {
"u_name": stmt_author.inserted.u_name,
@ -819,39 +797,26 @@ class DBSA:
"b_number": stmt_author.inserted.b_number,
"update_time": stmt_author.inserted.update_time,
}
ondup_author = stmt_author.on_duplicate_key_update(**upd_author)
ondup = stmt_author.on_duplicate_key_update(**upd_author)
try:
cls._execute_with_deadlock_retry(ondup_author)
cls._execute_with_deadlock_retry(ondup)
except Exception as e:
logger.error(f"写作者表失败: {e}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.error("[Redis 回退失败]", re)
return
# 入操作记录表
# 入操作记录表
try:
op_rows.sort(key=lambda x: (
x['rn'] or '',
x['name_title'] or '',
x['keyword'] or '',
x['v_xid'] or '',
x['createtime']
))
stmt = video_op.insert().values(op_rows)
with _engine.begin() as conn:
conn.execute(stmt)
logger.info(f"插入操作记录: {len(op_rows)}")
except Exception as e:
logger.error(f"插入操作记录失败: {e}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.error("[Redis 回退失败]", re)
return
# 插入/更新视频表
# 写入视频表(注意不包含 is_repeat / ts_status / rn
try:
stmt = mysql_insert(video).values(vid_rows)
upd = {
@ -863,9 +828,6 @@ class DBSA:
"watch_number": stmt.inserted.watch_number,
"follow_number": stmt.inserted.follow_number,
"video_number": stmt.inserted.video_number,
"is_repeat": stmt.inserted.is_repeat,
"is_piracy": stmt.inserted.is_piracy,
"ts_status": stmt.inserted.ts_status,
}
ondup = stmt.on_duplicate_key_update(**upd)
with _engine.begin() as conn:
@ -876,29 +838,20 @@ class DBSA:
)
except Exception as e:
logger.error(f"写视频表失败: {e}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.error("[Redis 回退失败]", re)
@classmethod
def update_video_stats(cls, locator: dict, stats: dict) -> int:
v_xid = locator.get("v_xid")
rn = locator.get("rn")
if not v_xid or not rn:
raise ValueError("locator 必须包含 'v_xid''rn'")
if not v_xid:
raise ValueError("locator 必须包含 'v_xid'")
params = dict(stats)
params["updatetime"] = int(time.time())
valid_cols = set(video.c.keys())
filtered_params = {k: v for k, v in params.items() if k in valid_cols}
stmt = (
video
.update()
.where(video.c.v_xid == v_xid, video.c.rn == rn)
.values(**filtered_params)
)
stmt = video.update().where(video.c.v_xid == v_xid).values(**filtered_params)
with _engine.begin() as conn:
result = conn.execute(stmt)
return result.rowcount