diff --git a/DB.py b/DB.py index 470cee6..0640def 100644 --- a/DB.py +++ b/DB.py @@ -1,17 +1,17 @@ import json import redis import pymysql -import time import functools from redis.exceptions import ConnectionError, TimeoutError import time, copy, threading from typing import List, Dict from sqlalchemy import ( create_engine, MetaData, Table, Column, - BigInteger, Integer, String, Text + BigInteger, Integer, String, Text, DateTime ) from sqlalchemy.dialects.mysql import insert as mysql_insert from logger import logger +from datetime import datetime MYSQL_URL = ( "mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ" @@ -67,6 +67,21 @@ video = Table("sh_dm_video_v2", _meta, Column("follow_number", Integer), Column("video_number", Integer), ) +video_author = Table( + "sh_dm_video_author", + _meta, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("u_id", String(64), nullable=True, comment="用户内部ID"), + Column("u_xid", String(64), nullable=False, unique=True, comment="用户外部ID(如第三方ID)"), + Column("u_name", String(64), nullable=False, comment="用户名"), + Column("u_pic", String(255), nullable=True, comment="用户头像URL"), + Column("follow_number", Integer, nullable=True, default=0, comment="粉丝量"), + Column("v_number", Integer, nullable=True, default=0, comment="用户发布的视频总数"), + Column("pv_number", Integer, nullable=True, default=0, comment="盗版视频数"), + Column("b_number", Integer, nullable=True, default=0, comment="打击视频数"), + Column("create_time", DateTime, nullable=False, comment="入库时间"), + Column("update_time", Integer, nullable=True, comment="更新时间(UNIX 时间戳)"), +) def mysql_retry(max_retries: int = 3, base_delay: float = 2.0): @@ -587,23 +602,103 @@ class DBSA: if not op_rows and not vid_rows: return - for r in vid_rows: - r.pop("is_repeat", None) - r.pop("level", None) - - start = time.time() - try: - cls._bulk_insert(op_rows) - cls._bulk_upsert(vid_rows) - logger.info(f"[DBSA] 成 op={len(op_rows)} video={len(vid_rows)} time={time.time() - start:.3f}s") - - except Exception as e: - logger.info(f"[DBSA] flush FAIL: {e} op={len(op_rows)} video={len(vid_rows)}") - # 批量退回原始 payload,字段最全 + existing_vid_ids = set() + if vid_rows: + all_v_ids = list({row["v_id"] for row in vid_rows}) + conn = _engine.connect() try: - cls.push_record_many(payloads) - except Exception as re: - logger.info("[Redis 回退失败]", re) + sel_vid = ( + video.select() + .with_only_columns([video.c.v_id]) + .where(video.c.v_id.in_(all_v_ids)) + ) + existing_vid_ids = {row.v_id for row in conn.execute(sel_vid).fetchall()} + except Exception as e: + logger.info(f"[DBSA] 查询 video 表时异常: {e}") + try: + cls.push_record_many(payloads) + except Exception as re: + logger.info("[Redis 回退失败]", re) + return + finally: + conn.close() + + for i, vid_row in enumerate(vid_rows): + if vid_row["v_id"] in existing_vid_ids: + op_rows[i]["is_repeat"] = 1 # 标记为更新 + else: + op_rows[i]["is_repeat"] = 2 # 标记为插入 + + authors_map = {} + now_ts = int(time.time()) + for data in payloads: + u_xid = data.get("u_xid") + if not u_xid: + continue + authors_map[u_xid] = { + "u_id": data.get("u_id"), + "u_xid": u_xid, + "u_name": data.get("u_name"), + "u_pic": data.get("u_pic"), + "follow_number": data.get("fans", 0), + "v_number": data.get("videos", 0), + "pv_number": 0, + "b_number": 0, + "create_time": datetime.utcnow(), + "update_time": now_ts + } + + author_rows = list(authors_map.values()) + if author_rows: + stmt_author = mysql_insert(video_author).values(author_rows) + upd_author = { + "u_name": stmt_author.inserted.u_name, + "u_pic": stmt_author.inserted.u_pic, + "follow_number": stmt_author.inserted.follow_number, + "v_number": stmt_author.inserted.v_number, + "pv_number": stmt_author.inserted.pv_number, + "b_number": stmt_author.inserted.b_number, + "update_time": stmt_author.inserted.update_time, + } + ondup_author = stmt_author.on_duplicate_key_update(**upd_author) + try: + with _engine.begin() as conn2: + conn2.execute(ondup_author) + except Exception as e: + logger.info(f"[DBSA] 写作者表失败: {e}") + try: + cls.push_record_many(payloads) + except Exception as re: + logger.info("[Redis 回退失败]", re) + return + + if op_rows: + try: + cls._bulk_insert(op_rows) + except Exception as e: + logger.info(f"[DBSA] 写日志表失败: {e}") + try: + cls.push_record_many(payloads) + except Exception as re: + logger.info("[Redis 回退失败]", re) + return + + for vid_row in vid_rows: + vid_row.pop("is_repeat", None) + vid_row.pop("level", None) + + if vid_rows: + try: + cls._bulk_upsert(vid_rows) + logger.info( + f"[DBSA] flush 完成:authors={len(author_rows)} 条,op={len(op_rows)} 条,video={len(vid_rows)} 条" + ) + except Exception as e: + logger.info(f"[DBSA] 写视频表失败: {e} op={len(op_rows)} video={len(vid_rows)}") + try: + cls.push_record_many(payloads) + except Exception as re: + logger.info("[Redis 回退失败]", re) @classmethod def update_video_stats(cls, locator: dict, stats: dict) -> int: