import json import redis import pymysql import functools from redis.exceptions import ConnectionError, TimeoutError import time, copy, threading from typing import List, Dict from sqlalchemy import ( create_engine, MetaData, Table, Column, BigInteger, Integer, String, Text, DateTime, tuple_ ) from sqlalchemy.dialects.mysql import insert as mysql_insert from logger import logger from datetime import datetime MYSQL_URL = ( "mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ" "@192.144.230.75:3306/db_vidcon?charset=utf8mb4" ) _engine = create_engine( MYSQL_URL, pool_size=20, max_overflow=10, pool_pre_ping=True, pool_recycle=3600, future=True, ) _meta = MetaData() video_op = Table("sh_dm_video_op_v2", _meta, Column("v_id", BigInteger, primary_key=True), Column("v_xid", String(64)), Column("a_id", Integer), Column("level", Integer), Column("name_title", String(255)), Column("keyword", String(255)), Column("rn", String(8)), Column("history_status", String(32)), Column("is_repeat", Integer), Column("sort", Integer), Column("createtime", Integer), Column("updatetime", Integer), Column("batch", BigInteger), Column("machine", Integer), ) video = Table("sh_dm_video_v2", _meta, Column("v_id", BigInteger, primary_key=True), Column("v_xid", String(64)), Column("rn", String(8)), Column("v_name", String(255)), Column("title", String(255)), Column("link", Text), Column("edition", String(64)), Column("duration", Integer), Column("public_time", String(32)), Column("cover_pic", Text), Column("sort", Integer), Column("u_xid", String(64)), Column("u_id", BigInteger), Column("u_pic", Text), Column("u_name", String(255)), Column("status", Integer), Column("createtime", Integer), Column("updatetime", Integer), Column("watch_number", Integer), Column("follow_number", Integer), Column("video_number", Integer), ) video_author = Table( "sh_dm_video_author", _meta, Column("id", Integer, primary_key=True, autoincrement=True), Column("u_id", String(64), nullable=True, comment="用户内部ID"), Column("u_xid", String(64), nullable=False, unique=True, comment="用户外部ID(如第三方ID)"), Column("u_name", String(64), nullable=False, comment="用户名"), Column("u_pic", String(255), nullable=True, comment="用户头像URL"), Column("follow_number", Integer, nullable=True, default=0, comment="粉丝量"), Column("v_number", Integer, nullable=True, default=0, comment="用户发布的视频总数"), Column("pv_number", Integer, nullable=True, default=0, comment="盗版视频数"), Column("b_number", Integer, nullable=True, default=0, comment="打击视频数"), Column("create_time", DateTime, nullable=False, comment="入库时间"), Column("update_time", Integer, nullable=True, comment="更新时间(UNIX 时间戳)"), ) def mysql_retry(max_retries: int = 3, base_delay: float = 2.0): """ 装饰器工厂:捕获 InterfaceError 后断线重连并重试, 重试间隔按指数级增长:base_delay * 2**(attempt-1) 秒。 """ def decorator(fn): @functools.wraps(fn) def wrapper(self, *args, **kwargs): for attempt in range(1, max_retries + 1): try: # 确保连接存活,reconnect=True 会在 ping 失败时重连 self.conn.ping(reconnect=True) return fn(self, *args, **kwargs) except pymysql.InterfaceError as e: wait = base_delay * (2 ** (attempt - 1)) logger.info(f"[MySQL][{fn.__name__}] 第{attempt}次 InterfaceError:{e},等待 {wait:.1f}s 后重连…") time.sleep(wait) self._reconnect_mysql() if attempt == max_retries: logger.info("[MySQL] 重试多次仍失败,抛出异常") raise return wrapper return decorator def redis_retry(max_retries: int = 3): """ 装饰器工厂:指定最大重试次数。 """ def decorator(fn): @functools.wraps(fn) def wrapper(self, *args, **kwargs): for attempt in range(1, max_retries + 1): try: return fn(self, *args, **kwargs) except (ConnectionError, TimeoutError) as e: logger.info(f"[Redis][{fn.__name__}] 第 {attempt} 次失败:{e}") self.reconnect_redis() if attempt == max_retries: logger.info("[Redis] 连接彻底失败") raise logger.info(f"[Redis] 重连后第 {attempt + 1} 次重试…") return wrapper return decorator class DBVidcon: _MYSQL_CONF = { "host": "192.144.230.75", # "127.0.0.1", # "port": 3306, # 3307, # "user": "db_vidcon", "password": "rexdK4fhCCiRE4BZ", "database": "db_vidcon", "charset": "utf8mb4", "cursorclass": pymysql.cursors.DictCursor, } _REDIS_CONF = { "host": "192.144.230.75", # "127.0.0.1", # "port": 6379, # 6380, # "password": "qwert@$123!&", "decode_responses": True, } def __init__(self): self.l0_list_key = "video_l0_queue" self.l1_list_key = "video_l1_queue" self.l2_list_key = "video_l2_queue" self.error_list_key = "error_save_queue" self.conn = pymysql.connect(**self._MYSQL_CONF) self.cursor = self.conn.cursor() self.redis = redis.Redis(**self._REDIS_CONF) def _connect_redis(self): """初始化或重建 Redis 客户端""" self.redis = redis.Redis(**self._REDIS_CONF) def reconnect_redis(self): """当捕获到 ConnectionError 时,重连 Redis""" try: self._connect_redis() except Exception as e: logger.info("[Redis reconnect error]", e) time.sleep(2) @redis_retry(max_retries=3) def push_record(self, data: dict): raw = json.dumps(data, ensure_ascii=False) self.redis.lpush(self.error_list_key, raw) @redis_retry(max_retries=3) def fetch_from_redis(self, count: int = 100, list_key: str = None): key = list_key try: raws = self.redis.lpop(key, count) except TypeError: raws = [] for _ in range(count): item = self.redis.rpop(key) if item is None: break raws.append(item) except redis.exceptions.ConnectionError as e: logger.info("[Redis pop error]", e) self.reconnect_redis() return [] if not raws: return [] if isinstance(raws, str): raws = [raws] out = [] for raw in raws: try: out.append((raw, json.loads(raw))) except json.JSONDecodeError: continue return out @redis_retry(max_retries=3) def push_l0(self, raws): """向 l0(加急)队列写入数据""" if isinstance(raws, str): raws = [raws] self.redis.lpush(self.l0_list_key, *raws) logger.info(f"[写入l0] 已推入 {len(raws)} 条") @redis_retry(max_retries=3) def push_l1(self, payloads): """向 l1(普通)队列写入数据""" if isinstance(payloads, str): payloads = [payloads] self.redis.rpush(self.l1_list_key, *payloads) logger.info(f"[写入l1] 已推入 {len(payloads)} 条") @redis_retry(max_retries=3) def push_l2(self, raws): """向 l2(低优先)队列写入数据""" if isinstance(raws, str): raws = [raws] self.redis.lpush(self.l2_list_key, *raws) logger.info(f"[写入l2] 已推入 {len(raws)} 条") @redis_retry(max_retries=3) def get_proxy_agent_dict(self) -> dict: try: meta_json = self.redis.hget("proxy_config", "meta") except Exception as e: logger.info("[Redis get_proxy_parameter error]", exc_info=e) self.reconnect_redis() return {} if not meta_json: return {} try: return json.loads(meta_json) except (json.JSONDecodeError, TypeError) as e: logger.warning("[Proxy meta JSON decode error]", exc_info=e) return {} @mysql_retry() def get_proxy_parameter(self, rn: str) -> str: sql = "SELECT parameter FROM proxy_agent WHERE rn = %s LIMIT 1" self.cursor.execute(sql, (rn,)) result = self.cursor.fetchone() logger.info(result) return result['parameter'] if result else None @redis_retry(max_retries=3) def item_keyword(self, count: int = 20): try: items = self.fetch_from_redis(count, list_key=self.l0_list_key) except Exception as e: logger.info("[Redis l0 pop error]", e) self.reconnect_redis() items = [] if items: return items, 0 try: items = self.fetch_from_redis(count, list_key=self.l1_list_key) except Exception as e: logger.info("[Redis l1 pop error]", e) self.reconnect_redis() items = [] if items: return items, 1 try: items = self.fetch_from_redis(count, list_key=self.l2_list_key) except Exception as e: logger.info("[Redis l2 pop error]", e) self.reconnect_redis() items = [] return items, 99 @redis_retry(max_retries=3) def rollback_l1(self, payloads): if isinstance(payloads, str): payloads = [payloads] self.redis.rpush(self.l1_list_key, *payloads) logger.info(f"[回滚l1] 已退回 {len(payloads)} 条") @redis_retry(max_retries=3) def rollback_l0(self, raws): if isinstance(raws, str): raws = [raws] self.redis.lpush(self.l0_list_key, *raws) logger.info(f"[回滚l0] 已退回 {len(raws)} 条") @redis_retry(max_retries=3) def rollback_l2(self, raws): if isinstance(raws, str): raws = [raws] self.redis.lpush(self.l2_list_key, *raws) logger.info(f"[回滚l2] 已退回 {len(raws)} 条") @mysql_retry() def upsert_video(self, data: dict): logger.info(fr"DB处理->{data.get('v_xid')},\tlevel->{data.get('level')}") data.setdefault("a_id", 0) data.setdefault("history_status", "") data.setdefault("is_piracy", 3) data.setdefault("is_repeat", 3) data["sort"] = data.get("index", 0) max_retries = 1 # 除了第一次外,再重试一次 attempt = 0 while True: try: sql_op = """ INSERT INTO sh_dm_video_op_v2 ( v_id, v_xid, a_id, level, name_title, keyword, rn, history_status, is_repeat, sort, createtime, updatetime, batch, machine ) VALUES ( %(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s ) """ self.cursor.execute(sql_op, data) sql_update = """ INSERT INTO sh_dm_video_v2 ( v_id, v_xid, rn, v_name, title, link, edition, duration, public_time, cover_pic, sort, u_xid, u_id, u_pic, u_name, status, createtime, updatetime ) VALUES ( %(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, '', %(duration)s, %(create_time)s, %(cover_pic)s, %(sort)s, %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP() ) ON DUPLICATE KEY UPDATE title = VALUES(title), duration = VALUES(duration), cover_pic = VALUES(cover_pic), sort = VALUES(sort), updatetime = UNIX_TIMESTAMP(); """ self.cursor.execute(sql_update, data) break # 成功跳出重试循环 except Exception as e: # 回滚这次未提交的改动 self.conn.rollback() logger.info("[数据库写入异常]", str(e)) logger.info("[出错数据]:", data) if attempt < max_retries: attempt += 1 logger.info(f"第 {attempt + 1} 次重试…") continue else: # 重试过后依然失败,推入 Redis 备用 logger.info("重试失败,将数据写入 Redis 以便后续处理") self.push_record(data) logger.info("[交由Redis处理]") break @mysql_retry() def fetch_keyword_title(self, level: int = 99): sql = """ SELECT k.keyword, k.rn, t.title AS v_name, ANY_VALUE(t.level) AS level FROM sh_dm_keyword k LEFT JOIN sh_dm_title t ON k.title = t.title WHERE k.status = 1 AND t.status = 1 AND NOT EXISTS ( SELECT 1 FROM sh_dm_black_keyword b WHERE b.title = t.title ) AND t.level = %s GROUP BY k.keyword, k.rn """ self.cursor.execute(sql, (level,)) return self.cursor.fetchall() @mysql_retry() def log_batch_start(self, info: Dict) -> int or None: batch = info.get("batch") level = info.get("level") if batch is None or level is None: raise ValueError("info 字典必须包含 'batch' 和 'level'") count = info.get("count", 0) if level == 0: t0, t1, t2 = count, 0, 0 elif level == 1: t0, t1, t2 = 0, count, 0 elif level == 9: level = 2 t0, t1, t2 = 0, 0, count start_ts = int(time.time()) sql = """ INSERT INTO sh_dm_batch_log (batch, info, t0, t1, t2, starttime) VALUES (%s, %s, %s, %s, %s, %s) """ try: self.cursor.execute(sql, (batch, level, t0, t1, t2, start_ts)) self.conn.commit() return self.cursor.lastrowid except Exception as e: logger.info(f"[log_batch_start] 插入失败:{e}") return None @mysql_retry() def flush(self): """批量执行完后手动提交。""" self.conn.commit() def close(self): try: if self.cursor: self.cursor.close() finally: if self.conn: self.conn.close() try: if hasattr(self, "redis") and self.redis: # redis-py ≥ 4.2 推荐直接调用 .close() if hasattr(self.redis, "close"): self.redis.close() else: self.redis.connection_pool.disconnect() except Exception as e: logger.info("[Redis close error]", e) @redis_retry(max_retries=3) def get_proxy(self, region_code: str) -> str: region_code = region_code.upper() list_json_str = self.redis.hget("proxy_config", "list") or "[]" conf_str = self.redis.hget("proxy_config", "conf") or "0" try: proxies = json.loads(list_json_str) idx = int(conf_str) entry = proxies[idx] result = entry.get("data", {}).get(region_code, "") if not result.startswith("http://"): result = "http://" + result return result except (ValueError, IndexError, KeyError, json.JSONDecodeError): return "" @redis_retry(max_retries=3) def queues_empty(self) -> bool: """ 如果都空,返回 True;只要有一个不空,就返回 False。 """ return ( self.redis.llen(self.l0_list_key) == 0 and self.redis.llen(self.l1_list_key) == 0 and self.redis.llen(self.l2_list_key) == 0 ) @redis_retry() def l0_empty(self) -> bool: return self.redis.llen(self.l0_list_key) == 0 @redis_retry() def l1_empty(self) -> bool: return self.redis.llen(self.l1_list_key) == 0 @redis_retry() def l2_empty(self) -> bool: return self.redis.llen(self.l2_list_key) == 0 @redis_retry(max_retries=3) def pop_error_item(self): """ 从 error_list_key 中弹出一个错误记录(lpop), 如果队列为空,返回 None。 """ item = self.redis.lpop(self.error_list_key) # 如果你存入的是 JSON 字符串,可以在这里做一次反序列化: return json.loads(item) if item is not None else None class DBSA: FLUSH_EVERY_ROWS = 100 # 行阈值 FLUSH_INTERVAL = 40 # 秒阈值 _buf_op: List[Dict] = [] _buf_vid: List[Dict] = [] _buf_payload: List[Dict] = [] _last_flush: float = time.time() _lock = threading.Lock() push_record_many = staticmethod( lambda rows: logger.info("[退回Redis] cnt=", len(rows)) ) @classmethod def upsert_video(cls, data): data = copy.deepcopy(data) data.setdefault("a_id", 0) data.setdefault("history_status", "") data.setdefault("is_repeat", 3) # 避免 KeyError data["sort"] = data.get("index", 0) now_ts = int(time.time()) op_row = { "v_id": data["v_id"], "v_xid": data["v_xid"], "a_id": data["a_id"], "level": data["level"], "name_title": data["v_name"], "keyword": data["keyword"], "rn": data["rn"], "history_status": data["history_status"], "is_repeat": data["is_repeat"], "sort": data["sort"], "createtime": now_ts, "updatetime": now_ts, "batch": data["batch"], "machine": data["machine_id"], } vid_row = { "v_id": data["v_id"], "v_xid": data["v_xid"], "rn": data["rn"], "v_name": data["v_name"], "title": data["title"], "link": data["link"], "edition": "", "duration": data["duration"], "public_time": data["create_time"], "cover_pic": data["cover_pic"], "sort": data["sort"], "u_xid": data["u_xid"], "u_id": data["u_id"], "u_pic": data["u_pic"], "u_name": data["u_name"], "status": 1, "createtime": now_ts, "updatetime": now_ts, "watch_number": data.get("view", 0), "follow_number": data.get("fans", 0), "video_number": data.get("videos", 0), } with cls._lock: cls._buf_op.append(op_row) cls._buf_vid.append(vid_row) cls._buf_payload.append(data) # 保存原始 buf_len = len(cls._buf_vid) # logger.info(f"DB缓冲 -> xid={data['v_xid']}, level={data['level']}, buffer={buf_len}") need_flush = False flush_reason = "" if buf_len >= cls.FLUSH_EVERY_ROWS: need_flush = True flush_reason = "ROWS" elif time.time() - cls._last_flush >= cls.FLUSH_INTERVAL: need_flush = True flush_reason = "TIME" if need_flush: logger.info(f"DBSA 落 ({flush_reason}) ...") cls.flush() @classmethod def _bulk_insert(cls, rows: List[Dict]): if not rows: return stmt = video_op.insert().values(rows) with _engine.begin() as conn: conn.execute(stmt) @classmethod def _bulk_upsert(cls, rows: List[Dict]): if not rows: return stmt = mysql_insert(video).values(rows) upd = { "title": stmt.inserted.title, "duration": stmt.inserted.duration, "cover_pic": stmt.inserted.cover_pic, "sort": stmt.inserted.sort, "updatetime": stmt.inserted.updatetime, } ondup = stmt.on_duplicate_key_update(**upd) with _engine.begin() as conn: conn.execute(ondup) @classmethod def flush(cls): with cls._lock: op_rows = cls._buf_op[:] vid_rows = cls._buf_vid[:] payloads = cls._buf_payload[:] cls._buf_op.clear() cls._buf_vid.clear() cls._buf_payload.clear() cls._last_flush = time.time() if not op_rows and not vid_rows: return existing_keys = set() if vid_rows: # 收集 (v_xid, rn) 对,应与 video 表中的唯一索引匹配 all_keys = list({(row["v_xid"], row["rn"]) for row in vid_rows}) conn = _engine.connect() try: sel_vid = ( video.select() .with_only_columns(video.c.v_xid, video.c.rn) .where(tuple_(video.c.v_xid, video.c.rn).in_(all_keys)) ) existing_keys = {(row.v_xid, row.rn) for row in conn.execute(sel_vid).fetchall()} except Exception as e: logger.info(f"[DBSA] 查询 video 表时异常: {e}") try: cls.push_record_many(payloads) except Exception as re: logger.info("[Redis 回退失败]", re) return finally: conn.close() for i, vid_row in enumerate(vid_rows): key = (vid_row["v_xid"], vid_row["rn"]) if key in existing_keys: op_rows[i]["is_repeat"] = 1 else: op_rows[i]["is_repeat"] = 2 # 以下作者表、日志表和视频表写入逻辑保持不变... authors_map = {} now_ts = int(time.time()) for data in payloads: u_xid = data.get("u_xid") if not u_xid: continue authors_map[u_xid] = { "u_id": data.get("u_id"), "u_xid": u_xid, "u_name": data.get("u_name"), "u_pic": data.get("u_pic"), "follow_number": data.get("fans", 0), "v_number": data.get("videos", 0), "pv_number": 0, "b_number": 0, "create_time": datetime.utcnow(), "update_time": now_ts } author_rows = list(authors_map.values()) if author_rows: stmt_author = mysql_insert(video_author).values(author_rows) upd_author = { "u_name": stmt_author.inserted.u_name, "u_pic": stmt_author.inserted.u_pic, "follow_number": stmt_author.inserted.follow_number, "v_number": stmt_author.inserted.v_number, "pv_number": stmt_author.inserted.pv_number, "b_number": stmt_author.inserted.b_number, "update_time": stmt_author.inserted.update_time, } ondup_author = stmt_author.on_duplicate_key_update(**upd_author) try: with _engine.begin() as conn2: conn2.execute(ondup_author) except Exception as e: logger.info(f"[DBSA] 写作者表失败: {e}") try: cls.push_record_many(payloads) except Exception as re: logger.info("[Redis 回退失败]", re) return if op_rows: try: cls._bulk_insert(op_rows) except Exception as e: logger.info(f"[DBSA] 写日志表失败: {e}") try: cls.push_record_many(payloads) except Exception as re: logger.info("[Redis 回退失败]", re) return for vid_row in vid_rows: vid_row.pop("is_repeat", None) vid_row.pop("level", None) if vid_rows: try: cls._bulk_upsert(vid_rows) logger.info( f"[DBSA] flush 完成:authors={len(author_rows)} 条,op={len(op_rows)} 条,video={len(vid_rows)} 条" ) except Exception as e: logger.info(f"[DBSA] 写视频表失败: {e} op={len(op_rows)} video={len(vid_rows)}") try: cls.push_record_many(payloads) except Exception as re: logger.info("[Redis 回退失败]", re) @classmethod def update_video_stats(cls, locator: dict, stats: dict) -> int: v_xid = locator.get("v_xid") rn = locator.get("rn") if not v_xid or not rn: raise ValueError("locator 必须包含 'v_xid' 和 'rn'") params = dict(stats) params["updatetime"] = int(time.time()) # 过滤只保留 video 表中存在的列 valid_cols = set(video.c.keys()) filtered_params = {k: v for k, v in params.items() if k in valid_cols} stmt = ( video .update() .where(video.c.v_xid == v_xid, video.c.rn == rn) .values(**filtered_params) ) with _engine.begin() as conn: result = conn.execute(stmt) return result.rowcount @classmethod def update_video_stats_async(cls, locator: dict, stats: dict) -> None: """ 异步更新 sh_dm_video_v2 表中的统计字段,立即返回,不阻塞调用线程。 """ thread = threading.Thread( target=cls.update_video_stats, args=(locator, stats), daemon=True ) thread.start()