feat: 添加视频作者表并更新视频数据处理逻辑

This commit is contained in:
晓丰 2025-06-01 18:27:55 +08:00
parent d34be05b6b
commit 45ced06532

131
DB.py
View File

@ -1,17 +1,17 @@
import json
import redis
import pymysql
import time
import functools
from redis.exceptions import ConnectionError, TimeoutError
import time, copy, threading
from typing import List, Dict
from sqlalchemy import (
create_engine, MetaData, Table, Column,
BigInteger, Integer, String, Text
BigInteger, Integer, String, Text, DateTime
)
from sqlalchemy.dialects.mysql import insert as mysql_insert
from logger import logger
from datetime import datetime
MYSQL_URL = (
"mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ"
@ -67,6 +67,21 @@ video = Table("sh_dm_video_v2", _meta,
Column("follow_number", Integer),
Column("video_number", Integer),
)
video_author = Table(
"sh_dm_video_author",
_meta,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("u_id", String(64), nullable=True, comment="用户内部ID"),
Column("u_xid", String(64), nullable=False, unique=True, comment="用户外部ID如第三方ID"),
Column("u_name", String(64), nullable=False, comment="用户名"),
Column("u_pic", String(255), nullable=True, comment="用户头像URL"),
Column("follow_number", Integer, nullable=True, default=0, comment="粉丝量"),
Column("v_number", Integer, nullable=True, default=0, comment="用户发布的视频总数"),
Column("pv_number", Integer, nullable=True, default=0, comment="盗版视频数"),
Column("b_number", Integer, nullable=True, default=0, comment="打击视频数"),
Column("create_time", DateTime, nullable=False, comment="入库时间"),
Column("update_time", Integer, nullable=True, comment="更新时间UNIX 时间戳)"),
)
def mysql_retry(max_retries: int = 3, base_delay: float = 2.0):
@ -587,23 +602,103 @@ class DBSA:
if not op_rows and not vid_rows:
return
for r in vid_rows:
r.pop("is_repeat", None)
r.pop("level", None)
start = time.time()
try:
cls._bulk_insert(op_rows)
cls._bulk_upsert(vid_rows)
logger.info(f"[DBSA] 成 op={len(op_rows)} video={len(vid_rows)} time={time.time() - start:.3f}s")
except Exception as e:
logger.info(f"[DBSA] flush FAIL: {e} op={len(op_rows)} video={len(vid_rows)}")
# 批量退回原始 payload字段最全
existing_vid_ids = set()
if vid_rows:
all_v_ids = list({row["v_id"] for row in vid_rows})
conn = _engine.connect()
try:
cls.push_record_many(payloads)
except Exception as re:
logger.info("[Redis 回退失败]", re)
sel_vid = (
video.select()
.with_only_columns([video.c.v_id])
.where(video.c.v_id.in_(all_v_ids))
)
existing_vid_ids = {row.v_id for row in conn.execute(sel_vid).fetchall()}
except Exception as e:
logger.info(f"[DBSA] 查询 video 表时异常: {e}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.info("[Redis 回退失败]", re)
return
finally:
conn.close()
for i, vid_row in enumerate(vid_rows):
if vid_row["v_id"] in existing_vid_ids:
op_rows[i]["is_repeat"] = 1 # 标记为更新
else:
op_rows[i]["is_repeat"] = 2 # 标记为插入
authors_map = {}
now_ts = int(time.time())
for data in payloads:
u_xid = data.get("u_xid")
if not u_xid:
continue
authors_map[u_xid] = {
"u_id": data.get("u_id"),
"u_xid": u_xid,
"u_name": data.get("u_name"),
"u_pic": data.get("u_pic"),
"follow_number": data.get("fans", 0),
"v_number": data.get("videos", 0),
"pv_number": 0,
"b_number": 0,
"create_time": datetime.utcnow(),
"update_time": now_ts
}
author_rows = list(authors_map.values())
if author_rows:
stmt_author = mysql_insert(video_author).values(author_rows)
upd_author = {
"u_name": stmt_author.inserted.u_name,
"u_pic": stmt_author.inserted.u_pic,
"follow_number": stmt_author.inserted.follow_number,
"v_number": stmt_author.inserted.v_number,
"pv_number": stmt_author.inserted.pv_number,
"b_number": stmt_author.inserted.b_number,
"update_time": stmt_author.inserted.update_time,
}
ondup_author = stmt_author.on_duplicate_key_update(**upd_author)
try:
with _engine.begin() as conn2:
conn2.execute(ondup_author)
except Exception as e:
logger.info(f"[DBSA] 写作者表失败: {e}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.info("[Redis 回退失败]", re)
return
if op_rows:
try:
cls._bulk_insert(op_rows)
except Exception as e:
logger.info(f"[DBSA] 写日志表失败: {e}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.info("[Redis 回退失败]", re)
return
for vid_row in vid_rows:
vid_row.pop("is_repeat", None)
vid_row.pop("level", None)
if vid_rows:
try:
cls._bulk_upsert(vid_rows)
logger.info(
f"[DBSA] flush 完成authors={len(author_rows)}op={len(op_rows)}video={len(vid_rows)}"
)
except Exception as e:
logger.info(f"[DBSA] 写视频表失败: {e} op={len(op_rows)} video={len(vid_rows)}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.info("[Redis 回退失败]", re)
@classmethod
def update_video_stats(cls, locator: dict, stats: dict) -> int: