import json import redis import pymysql import functools from redis.exceptions import ConnectionError, TimeoutError import time, copy, threading from typing import List, Dict from sqlalchemy import ( create_engine, MetaData, Table, Column, BigInteger, Integer, String, Text, DateTime, tuple_ ) from queue import Queue, Empty from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.exc import OperationalError from sqlalchemy import select from logger import logger from datetime import datetime MYSQL_URL = ( "mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ" "@192.144.230.75:3306/db_vidcon?charset=utf8mb4" ) _engine = create_engine( MYSQL_URL, pool_size=20, max_overflow=10, pool_pre_ping=True, pool_recycle=3600, future=True, ) _meta = MetaData() # 操作记录表 video_op = Table("sh_dm_video_op_v3", _meta, Column("id", Integer, primary_key=True, autoincrement=True), Column("v_id", String(64)), Column("v_xid", String(64)), Column("a_id", Integer, default=0), Column("level", Integer), Column("name_title", String(100)), Column("keyword", String(100)), Column("rn", String(50)), Column("history_status", String(100)), Column("is_repeat", Integer), Column("is_piracy", String(2), default='3'), Column("sort", Integer), Column("createtime", Integer), Column("updatetime", Integer), Column("operatetime", Integer), Column("batch", Integer), Column("machine", Integer), Column("ts_status", Integer, default=1), ) # 视频表 video = Table("sh_dm_video_v3", _meta, Column("id", Integer, primary_key=True, autoincrement=True), Column("v_id", String(64)), Column("v_name", String(100), nullable=False), Column("v_xid", String(64), nullable=False), Column("title", String(255), nullable=False), Column("link", String(255), nullable=False), Column("edition", String(255), default=''), Column("duration", String(11), default='0'), Column("watch_number", Integer, default=0), Column("follow_number", Integer, default=0), Column("video_number", Integer, default=0), Column("public_time", DateTime), Column("cover_pic", String(255)), Column("sort", Integer), Column("u_xid", String(64)), Column("u_id", String(100)), Column("u_pic", String(255)), Column("u_name", String(255)), Column("status", Integer, default=1), Column("createtime", Integer, default=0), Column("updatetime", Integer, default=0), Column("operatetime", Integer, default=0), ) # 作者表 video_author = Table( "sh_dm_video_author", _meta, Column("id", Integer, primary_key=True, autoincrement=True), Column("u_id", String(64), nullable=True, comment="用户内部ID"), Column("u_xid", String(64), nullable=False, unique=True, comment="用户外部ID(如第三方ID)"), Column("u_name", String(64), nullable=False, comment="用户名"), Column("u_pic", String(255), nullable=True, comment="用户头像URL"), Column("follow_number", Integer, nullable=True, default=0, comment="粉丝量"), Column("v_number", Integer, nullable=True, default=0, comment="用户发布的视频总数"), Column("pv_number", Integer, nullable=True, default=0, comment="盗版视频数"), Column("b_number", Integer, nullable=True, default=0, comment="打击视频数"), Column("create_time", DateTime, nullable=False, comment="入库时间"), Column("update_time", Integer, nullable=True, comment="更新时间(UNIX 时间戳)"), ) def mysql_retry(max_retries: int = 3, base_delay: float = 1.0): RETRIABLE_ERRORS = {2013, 1213, 2006} def decorator(fn): @functools.wraps(fn) def wrapper(self, *args, **kwargs): for attempt in range(1, max_retries + 1): try: # 确保连接仍存活,失败自动 reconnect self.conn.ping(reconnect=True) return fn(self, *args, **kwargs) except pymysql.OperationalError as e: errno = e.args[0] if errno not in RETRIABLE_ERRORS: raise reason = { 2013: "连接断开", 1213: "死锁冲突", 2006: "连接失效", }.get(errno, f"MySQL错误{errno}") wait = base_delay * (2 ** (attempt - 1)) logger.warning(f"[MySQL][{fn.__name__}] 第{attempt}次重试({errno} {reason}):{e},等待 {wait:.1f}s...") # 仅对断连类错误尝试重连 if errno in {2013, 2006}: self._reconnect_mysql() time.sleep(wait) logger.error(f"[MySQL] 函数 `{fn.__name__}` 重试 {max_retries} 次仍失败,最终异常:{e}") raise return wrapper return decorator def redis_retry(max_retries: int = 3): """ 装饰器工厂:指定最大重试次数。 """ def decorator(fn): @functools.wraps(fn) def wrapper(self, *args, **kwargs): for attempt in range(1, max_retries + 1): try: return fn(self, *args, **kwargs) except (ConnectionError, TimeoutError) as e: logger.info(f"[Redis][{fn.__name__}] 第 {attempt} 次失败:{e}") self.reconnect_redis() if attempt == max_retries: logger.info("[Redis] 连接彻底失败") raise logger.info(f"[Redis] 重连后第 {attempt + 1} 次重试…") return wrapper return decorator class DBVidcon: _MYSQL_CONF = { "host": "192.144.230.75", # "127.0.0.1", # "port": 3306, # 3307, # "user": "db_vidcon", "password": "rexdK4fhCCiRE4BZ", "database": "db_vidcon", "charset": "utf8mb4", "cursorclass": pymysql.cursors.DictCursor, } _REDIS_CONF = { "host": "192.144.230.75", # "127.0.0.1", # "port": 6379, # 6380, # "password": "qwert@$123!&", "decode_responses": True, } def __init__(self): self.l0_list_key = "video_l0_queue" self.l1_list_key = "video_l1_queue" self.l2_list_key = "video_l2_queue" self.report_list = "report_queue" self.error_list_key = "error_save_queue" self.conn = pymysql.connect(**self._MYSQL_CONF) self.cursor = self.conn.cursor() self.redis = redis.Redis(**self._REDIS_CONF) def _connect_redis(self): """初始化或重建 Redis 客户端""" self.redis = redis.Redis(**self._REDIS_CONF) def reconnect_redis(self): """当捕获到 ConnectionError 时,重连 Redis""" try: self._connect_redis() except Exception as e: logger.info("[Redis reconnect error]", e) time.sleep(2) @redis_retry(max_retries=3) def push_record(self, data: dict): raw = json.dumps(data, ensure_ascii=False) self.redis.lpush(self.error_list_key, raw) @redis_retry(max_retries=3) def fetch_from_redis(self, count: int = 100, list_key: str = None): key = list_key try: raws = self.redis.lpop(key, count) except TypeError: raws = [] for _ in range(count): item = self.redis.rpop(key) if item is None: break raws.append(item) except redis.exceptions.ConnectionError as e: logger.info("[Redis pop error]", e) self.reconnect_redis() return [] if not raws: return [] if isinstance(raws, str): raws = [raws] out = [] for raw in raws: try: out.append((raw, json.loads(raw))) except json.JSONDecodeError: continue return out @mysql_retry() def get_account_info(self, mid: str): sql = "SELECT `account`,`password` FROM sh_site_accounts WHERE id = %s AND status=1 LIMIT 1" self.cursor.execute(sql, (mid,)) result = self.cursor.fetchone() return result @redis_retry(max_retries=3) def push_l0(self, raws): """向 l0(加急)队列写入数据""" if isinstance(raws, str): raws = [raws] self.redis.lpush(self.l0_list_key, *raws) logger.info(f"[写入l0] 已推入 {len(raws)} 条") @redis_retry(max_retries=3) def push_l1(self, payloads): """向 l1(普通)队列写入数据""" if isinstance(payloads, str): payloads = [payloads] self.redis.rpush(self.l1_list_key, *payloads) logger.info(f"[写入l1] 已推入 {len(payloads)} 条") @redis_retry(max_retries=3) def push_l2(self, raws): """向 l2(低优先)队列写入数据""" if isinstance(raws, str): raws = [raws] self.redis.lpush(self.l2_list_key, *raws) logger.info(f"[写入l2] 已推入 {len(raws)} 条") @redis_retry(max_retries=3) def push_report(self, raws): """原子操作:清空列表并写入新数据""" if isinstance(raws, str): raws = [raws] with self.redis.pipeline() as pipe: # 开始事务 pipe.multi() # 删除列表 pipe.delete(self.report_list) # 如果有新数据,则推入 if raws: pipe.rpush(self.report_list, *raws) # 执行事务 pipe.execute() if raws: logger.info(f"[写入report] 原子操作:已清空并推入 {len(raws)} 条新数据") else: logger.info(f"[写入report] 原子操作:已清空列表") @redis_retry(max_retries=3) def get_proxy_agent_dict(self) -> dict: try: meta_json = self.redis.hget("proxy_config", "meta") except Exception as e: logger.info("[Redis get_proxy_parameter error]", exc_info=e) self.reconnect_redis() return {} if not meta_json: return {} try: return json.loads(meta_json) except (json.JSONDecodeError, TypeError) as e: logger.warning("[Proxy meta JSON decode error]", exc_info=e) return {} @mysql_retry() def get_proxy_parameter(self, rn: str) -> str: sql = "SELECT parameter FROM proxy_agent WHERE rn = %s LIMIT 1" self.cursor.execute(sql, (rn,)) result = self.cursor.fetchone() logger.info(result) return result['parameter'] if result else None @redis_retry(max_retries=3) def item_report(self, count: int = 100): try: items = self.fetch_from_redis(count, list_key=self.report_list) except Exception as e: logger.info("[Redis l0 pop error]", e) self.reconnect_redis() items = [] return items @redis_retry(max_retries=3) def item_keyword(self, count: int = 20): try: items = self.fetch_from_redis(count, list_key=self.l0_list_key) except Exception as e: logger.info("[Redis l0 pop error]", e) self.reconnect_redis() items = [] if items: return items, 0 try: items = self.fetch_from_redis(count, list_key=self.l1_list_key) except Exception as e: logger.info("[Redis l1 pop error]", e) self.reconnect_redis() items = [] if items: return items, 1 try: items = self.fetch_from_redis(count, list_key=self.l2_list_key) except Exception as e: logger.info("[Redis l2 pop error]", e) self.reconnect_redis() items = [] if items: return items, 2 logger.info("[Redis queues empty] 所有队列均为空") return items, 99 @redis_retry(max_retries=3) def rollback_l1(self, payloads): if isinstance(payloads, str): payloads = [payloads] self.redis.rpush(self.l1_list_key, *payloads) logger.info(f"[回滚l1] 已退回 {len(payloads)} 条") @redis_retry(max_retries=3) def rollback_l0(self, raws): if isinstance(raws, str): raws = [raws] self.redis.lpush(self.l0_list_key, *raws) logger.info(f"[回滚l0] 已退回 {len(raws)} 条") @redis_retry(max_retries=3) def rollback_l2(self, raws): if isinstance(raws, str): raws = [raws] self.redis.lpush(self.l2_list_key, *raws) logger.info(f"[回滚l2] 已退回 {len(raws)} 条") @mysql_retry() def get_report_video(self): sql = """ SELECT id, name_title, link FROM sh_dm_fight_records WHERE status = 1 """ self.cursor.execute(sql) return self.cursor.fetchall() @mysql_retry() def get_subsequent_report_video(self,did: int): sql = """ SELECT DISTINCT report_id FROM sh_dm_fight_records WHERE status = 2 AND subsequent_status = 1 AND report_time != '' AND mid = %s """ self.cursor.execute(sql,did) return self.cursor.fetchall() @mysql_retry() def getreport_video(self): sql = """ SELECT id, v_xid FROM sh_dm_fight_records WHERE is_removed = '' or is_removed IS NULL or is_removed = 0 """ self.cursor.execute(sql) return self.cursor.fetchall() @mysql_retry() def mark_video_removed(self, d_id: int, removed_flag: int = 1): sql = """ UPDATE sh_dm_fight_records SET is_removed = %s WHERE id = %s """ self.cursor.execute(sql, (removed_flag, d_id)) self.flush() @mysql_retry() def update_fight_record_status(self, ids, report_id: int, new_status: int, errinfo: str = "", report_time: int = 0, subsequent_status: int = 1, mid=0): if not ids: return # 空列表直接返回 placeholders = ','.join(['%s'] * len(ids)) sql = f""" UPDATE sh_dm_fight_records SET status = %s, errinfo = %s, updata_time = %s, report_id = %s, subsequent_status = %s, report_time = %s, mid = %s WHERE id IN ({placeholders}) """ now_ts = int(time.time()) params = [new_status, errinfo, now_ts, report_id, subsequent_status, report_time, mid] + ids self.cursor.execute(sql, params) @mysql_retry() def update_subsequent_status_by_report_id(self, report_id: int, new_status: int, info: str = ""): sql = """ UPDATE sh_dm_fight_records SET subsequent_status = %s, updata_time = UNIX_TIMESTAMP(), subsequent_info = %s WHERE report_id = %s \ """ self.cursor.execute(sql, (new_status, info, report_id)) self.flush() @mysql_retry() def update_video_ts_status(self): sql = """ UPDATE sh_dm_video_v2 v JOIN sh_dm_video_author a ON v.u_xid = a.u_xid SET v.ts_status = 3 WHERE a.white_status = 1; """ self.cursor.execute(sql) self.flush() logger.info("[更新视频举报状态] 已执行完毕") @mysql_retry() def upsert_video(self, data: dict): logger.info(fr"DB处理->{data.get('v_xid')},\tlevel->{data.get('level')}") data.setdefault("a_id", 0) data.setdefault("history_status", "") data.setdefault("is_piracy", 3) data.setdefault("is_repeat", 3) data["sort"] = data.get("index", 0) max_retries = 1 # 除了第一次外,再重试一次 attempt = 0 while True: try: sql_op = """ INSERT INTO sh_dm_video_op_v2 ( v_id, v_xid, a_id, level, name_title, keyword, rn, history_status, is_repeat, sort, createtime, updatetime, batch, machine ) VALUES ( %(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s ) """ self.cursor.execute(sql_op, data) sql_update = """ INSERT INTO sh_dm_video_v2 ( v_id, v_xid, rn, v_name, title, link, edition, duration, public_time, cover_pic, sort, u_xid, u_id, u_pic, u_name, status, createtime, updatetime ) VALUES ( %(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, '', %(duration)s, %(create_time)s, %(cover_pic)s, %(sort)s, %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP() ) ON DUPLICATE KEY UPDATE title = VALUES(title), duration = VALUES(duration), cover_pic = VALUES(cover_pic), sort = VALUES(sort), updatetime = UNIX_TIMESTAMP(); """ self.cursor.execute(sql_update, data) break # 成功跳出重试循环 except Exception as e: # 回滚这次未提交的改动 self.conn.rollback() logger.info("[数据库写入异常]", str(e)) logger.info("[出错数据]:", data) if attempt < max_retries: attempt += 1 logger.info(f"第 {attempt + 1} 次重试…") continue else: # 重试过后依然失败,推入 Redis 备用 logger.info("重试失败,将数据写入 Redis 以便后续处理") self.push_record(data) logger.info("[交由Redis处理]") break @mysql_retry() def fetch_keyword_title(self, level: int = 99): sql = """ SELECT k.keyword, k.rn, t.title AS v_name, ANY_VALUE(t.level) AS level FROM sh_dm_keyword k LEFT JOIN sh_dm_title t ON k.title = t.title WHERE k.status = 1 AND t.status = 1 AND NOT EXISTS ( SELECT 1 FROM sh_dm_black_keyword b WHERE ( (b.keyword IS NULL OR b.keyword = '') AND b.title = t.title ) OR ( b.keyword IS NOT NULL AND b.keyword != '' AND b.keyword = k.keyword ) ) AND t.level = %s GROUP BY k.keyword, k.rn; """ self.cursor.execute(sql, (level,)) return self.cursor.fetchall() @mysql_retry() def log_batch_start(self, info: Dict) -> int or None: batch = info.get("batch") level = info.get("level") if batch is None or level is None: raise ValueError("info 字典必须包含 'batch' 和 'level'") count = info.get("count", 0) if level == 0: t0, t1, t2 = count, 0, 0 elif level == 1: t0, t1, t2 = 0, count, 0 elif level == 9: level = 2 t0, t1, t2 = 0, 0, count start_ts = int(time.time()) sql = """ INSERT INTO sh_dm_batch_log (batch, info, t0, t1, t2, starttime) VALUES (%s, %s, %s, %s, %s, %s) """ try: self.cursor.execute(sql, (batch, level, t0, t1, t2, start_ts)) self.conn.commit() return self.cursor.lastrowid except Exception as e: logger.info(f"[log_batch_start] 插入失败:{e}") return None @mysql_retry() def flush(self): """批量执行完后手动提交。""" self.conn.commit() def close(self): try: if self.cursor: self.cursor.close() finally: if self.conn: self.conn.close() try: if hasattr(self, "redis") and self.redis: # redis-py ≥ 4.2 推荐直接调用 .close() if hasattr(self.redis, "close"): self.redis.close() else: self.redis.connection_pool.disconnect() except Exception as e: logger.info("[Redis close error]", e) @redis_retry(max_retries=3) def get_proxy(self, region_code: str) -> str: region_code = region_code.upper() list_json_str = self.redis.hget("proxy_config", "list") or "[]" conf_str = self.redis.hget("proxy_config", "conf") or "0" try: proxies = json.loads(list_json_str) idx = int(conf_str) entry = proxies[idx] result = entry.get("data", {}).get(region_code, "") if not result.startswith("http://"): result = "http://" + result return result except (ValueError, IndexError, KeyError, json.JSONDecodeError): return "" @redis_retry(max_retries=3) def queues_empty(self) -> bool: """ 如果都空,返回 True;只要有一个不空,就返回 False。 """ return ( self.redis.llen(self.l0_list_key) == 0 and self.redis.llen(self.l1_list_key) == 0 and self.redis.llen(self.l2_list_key) == 0 ) @redis_retry() def l0_empty(self) -> bool: return self.redis.llen(self.l0_list_key) == 0 @redis_retry() def l1_empty(self) -> bool: return self.redis.llen(self.l1_list_key) == 0 @redis_retry() def l2_empty(self) -> bool: return self.redis.llen(self.l2_list_key) == 0 @redis_retry(max_retries=3) def pop_error_item(self): """ 从 error_list_key 中弹出一个错误记录(lpop), 如果队列为空,返回 None。 """ item = self.redis.lpop(self.error_list_key) # 如果你存入的是 JSON 字符串,可以在这里做一次反序列化: return json.loads(item) if item is not None else None class DBSA: # ======= 可调参数 ======= FLUSH_EVERY_ROWS = 100 # 行数阈值 FLUSH_INTERVAL = 30 # 秒阈值 MAX_SQL_RETRY = 3 # SQL 死锁自旋 SQL_RETRY_BASE_SLEEP = 0.5 # 自旋退避基数 FLUSH_RETRY = 3 # flush 整体轮次 DELAY_ON_FAIL = 10 # flush 失败等待 DEADLOCK_ERRNO = 1213 # MySQL 死锁码 LOCK_TIMEOUT = 3 # 互斥锁超时 # ======================== # ----- 缓冲区 ----- _buf_op = [] _buf_vid = [] _buf_payload = [] _last_flush = time.time() # ----- 并发控制 ----- _lock = threading.Lock() _existing_op_keys = set() _existing_vid_keys = set() # ----- queue / 后台线程模式 ----- _queue_mode = False _queue = Queue() # ================== 退回 Redis 模拟 ================== @staticmethod def push_record_many(rows): logger.warning("[退回Redis] cnt=%d", len(rows)) # ---------------------------------------------------- # 对外主入口 # ---------------------------------------------------- @classmethod def upsert_video(cls, data: dict): if cls._queue_mode: cls._queue.put(data) return # ---------- 数据深拷贝 / 默认值 ---------- data = copy.deepcopy(data) data.setdefault("a_id", 0) data.setdefault("is_repeat", 3) data.setdefault("keyword", "") data["sort"] = data.get("index", 0) now_ts = int(time.time()) op_index_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) vid_index_key = (data["v_xid"] or "", data["v_name"] or "") # ---------- ① 获取互斥锁 ---------- if not cls._lock.acquire(timeout=cls.LOCK_TIMEOUT): logger.error("⚠️ [upsert_video] 获取 cls._lock 超时 %ds", cls.LOCK_TIMEOUT) return try: # ---------- ② 去重 ---------- if op_index_key in cls._existing_op_keys or vid_index_key in cls._existing_vid_keys: return # ---------- ③ 构造 op_row ---------- op_row = dict( v_id=data["v_id"], v_xid=data["v_xid"], a_id=data["a_id"], level=data.get("level", 0), name_title=data["v_name"], keyword=data["keyword"], is_repeat=data["is_repeat"], sort=data["sort"], createtime=now_ts, # 首次插入 updatetime=now_ts, # 后续更新只改这一列 batch=data.get("batch", 0), machine=data.get("machine_id", 0), is_piracy=data.get("is_piracy", '3'), ts_status=data.get("ts_status", 1), rn=data.get("rn", ""), ) # ---------- ④ 构造 vid_row ---------- vid_row = dict( v_id=data["v_id"], v_xid=data["v_xid"], title=data["title"], v_name=data["v_name"], link=data["link"], edition="", duration=str(data["duration"]) if data.get("duration") else '0', public_time=data["create_time"], cover_pic=data["cover_pic"], sort=data["sort"], u_xid=data["u_xid"], u_id=data["u_id"], u_pic=data["u_pic"], u_name=data["u_name"], status=1, createtime=now_ts, updatetime=now_ts, watch_number=data.get("view", 0), follow_number=data.get("fans", 0), video_number=data.get("videos", 0), ) # 只保留 video 表合法字段 vid_row = {k: v for k, v in vid_row.items() if k in video.c} # ---------- ⑤ 入缓冲 ---------- cls._buf_op.append(op_row) cls._buf_vid.append(vid_row) cls._buf_payload.append(data) cls._existing_op_keys.add(op_index_key) cls._existing_vid_keys.add(vid_index_key) finally: cls._lock.release() # ---------- ⑥ 判断是否触发 flush ---------- if (len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS or time.time() - cls._last_flush >= cls.FLUSH_INTERVAL): logger.info("落表:达到行数或超时阈值,开始落库") cls.flush() # ---------------------------------------------------- # 单条 SQL 安全执行:死锁自旋 + 连接池日志 # ---------------------------------------------------- @classmethod def _safe_execute(cls, statement, desc=""): for attempt in range(cls.MAX_SQL_RETRY): try: logger.debug("[%s] 准备借连接", desc) with _engine.begin() as conn: logger.debug("[%s] 借连接成功", desc) conn.execute(statement) return except Exception as e: err_no = getattr(e.orig, "args", [None])[0] if err_no == cls.DEADLOCK_ERRNO and attempt < cls.MAX_SQL_RETRY - 1: time.sleep(cls.SQL_RETRY_BASE_SLEEP * (attempt + 1)) logger.warning("[%s] 死锁重试 %d/%d", desc, attempt + 1, cls.MAX_SQL_RETRY) continue logger.exception("[%s] 执行 SQL 失败", desc) raise # ---------------------------------------------------- # flush 外层:整体重试 # ---------------------------------------------------- @classmethod def flush(cls): for round_no in range(1, cls.FLUSH_RETRY + 1): try: cls._flush_once() return except Exception as e: logger.error("[flush] 第 %d 轮失败:%s", round_no, e) if round_no < cls.FLUSH_RETRY: time.sleep(cls.DELAY_ON_FAIL) logger.info("[flush] 等待 %ds 后重试…", cls.DELAY_ON_FAIL) else: logger.error("[flush] 连续 %d 轮失败,退回 Redis", cls.FLUSH_RETRY) cls.push_record_many(cls._buf_payload) cls._clear_buffers() return # ---------------------------------------------------- # 真正写库 # ---------------------------------------------------- @classmethod def _flush_once(cls): t0 = time.time() # ---------- 拷贝缓冲并清空 ---------- if not cls._lock.acquire(timeout=cls.LOCK_TIMEOUT): raise RuntimeError("flush 未取得 cls._lock,可能死锁") try: op_rows = cls._buf_op[:] vid_rows = cls._buf_vid[:] payloads = cls._buf_payload[:] cls._clear_buffers() finally: cls._lock.release() if not op_rows and not vid_rows: return # ---------- 写 video_author ---------- authors_map = {} now_ts = int(time.time()) for d in payloads: uxid = d.get("u_xid") if not uxid: continue authors_map[uxid] = dict( u_xid=uxid, u_id=d.get("u_id", 0), u_name=d.get("u_name"), u_pic=d.get("u_pic"), follow_number=d.get("fans", 0), v_number=d.get("videos", 0), pv_number=0, b_number=0, create_time=datetime.utcnow(), update_time=now_ts, ) if authors_map: stmt_auth = mysql_insert(video_author).values(list(authors_map.values())) ondup_auth = stmt_auth.on_duplicate_key_update( u_name=stmt_auth.inserted.u_name, u_pic=stmt_auth.inserted.u_pic, follow_number=stmt_auth.inserted.follow_number, v_number=stmt_auth.inserted.v_number, update_time=stmt_auth.inserted.update_time, ) cls._safe_execute(ondup_auth, desc="video_author") # ---------- 写 video_op ---------- if op_rows: stmt_op = mysql_insert(video_op).values(op_rows) ondup_op = stmt_op.on_duplicate_key_update( updatetime = stmt_op.inserted.updatetime, # ts_status = stmt_op.inserted.ts_status, is_repeat = stmt_op.inserted.is_repeat, ) cls._safe_execute(ondup_op, desc="video_op") logger.info("落表:操作记录 %d 条", len(op_rows)) # ---------- 写 video ---------- if vid_rows: stmt_vid = mysql_insert(video).values(vid_rows) ondup_vid = stmt_vid.on_duplicate_key_update( title = stmt_vid.inserted.title, v_name = stmt_vid.inserted.v_name, link = stmt_vid.inserted.link, edition = stmt_vid.inserted.edition, duration = stmt_vid.inserted.duration, watch_number = stmt_vid.inserted.watch_number, follow_number = stmt_vid.inserted.follow_number, video_number = stmt_vid.inserted.video_number, public_time = stmt_vid.inserted.public_time, cover_pic = stmt_vid.inserted.cover_pic, sort = stmt_vid.inserted.sort, u_xid = stmt_vid.inserted.u_xid, u_id = stmt_vid.inserted.u_id, u_pic = stmt_vid.inserted.u_pic, u_name = stmt_vid.inserted.u_name, status = stmt_vid.inserted.status, updatetime = stmt_vid.inserted.updatetime, ) cls._safe_execute(ondup_vid, desc="video") logger.info("落表:视频记录 %d 条", len(vid_rows)) logger.debug("[flush] 本轮耗时 %.3f s", time.time() - t0) # ---------------------------------------------------- # 清空缓冲 # ---------------------------------------------------- @classmethod def _clear_buffers(cls): cls._buf_op.clear() cls._buf_vid.clear() cls._buf_payload.clear() cls._existing_op_keys.clear() cls._existing_vid_keys.clear() cls._last_flush = time.time() # ---------------------------------------------------- # 4️⃣ 可选:启用后台单线程落库 # ---------------------------------------------------- @classmethod def start_single_flusher(cls): cls._queue_mode = True def _worker(): batch = [] while True: try: data = cls._queue.get(timeout=3) batch.append(data) while True: try: batch.append(cls._queue.get_nowait()) except Empty: break except Empty: pass if not batch: continue for d in batch: cls._buffer_without_lock(d) batch.clear() if len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS: cls.flush() threading.Thread(target=_worker, daemon=True).start() logger.info("后台 flusher 线程已启动(单线程写库模式)") # ---------------------------------------------------- # 将 queue 里的数据写入缓冲(不加线程锁) # ---------------------------------------------------- @classmethod def _buffer_without_lock(cls, data): data = copy.deepcopy(data) data.setdefault("is_repeat", 3) data.setdefault("keyword", "") data["sort"] = data.get("index", 0) now_ts = int(time.time()) op_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) vid_key = (data["v_xid"] or "", data["v_name"] or "") if op_key in cls._existing_op_keys or vid_key in cls._existing_vid_keys: return op_row = dict( v_id=data["v_id"], v_xid=data["v_xid"], a_id=data.get("a_id", 0), level=data.get("level", 0), name_title=data["v_name"], keyword=data["keyword"], is_repeat=data["is_repeat"], sort=data["sort"], createtime=now_ts, updatetime=now_ts, batch=data.get("batch", 0), machine=data.get("machine_id", 0), is_piracy=data.get("is_piracy", '3'), ts_status=data.get("ts_status", 1), rn=data.get("rn", ""), ) vid_row = dict( v_id=data["v_id"], v_xid=data["v_xid"], title=data["title"], v_name=data["v_name"], link=data["link"], edition="", duration=str(data["duration"]) if data.get("duration") else '0', public_time=data["create_time"], cover_pic=data["cover_pic"], sort=data["sort"], u_xid=data["u_xid"], u_id=data["u_id"], u_pic=data["u_pic"], u_name=data["u_name"], status=1, createtime=now_ts, updatetime=now_ts, watch_number=data.get("view", 0), follow_number=data.get("fans", 0), video_number=data.get("videos", 0), ) vid_row = {k: v for k, v in vid_row.items() if k in video.c} cls._buf_op.append(op_row) cls._buf_vid.append(vid_row) cls._buf_payload.append(data) cls._existing_op_keys.add(op_key) cls._existing_vid_keys.add(vid_key)