import json import redis import pymysql import time class DBVidcon: _MYSQL_CONF = { "host": "192.144.230.75", "port": 3306, "user": "db_vidcon", "password": "rexdK4fhCCiRE4BZ", "database": "db_vidcon", "charset": "utf8mb4", "cursorclass": pymysql.cursors.DictCursor, } _REDIS_CONF = { "host": "192.144.230.75", "port": 6379, "password": "qwert@$123!&", "decode_responses": True, } def __init__(self): self.list_key = "video_kw_queue" self.record_list_key = "record_kw_queue" self.urgent_list_key = "video_urgent_queue" self.conn = pymysql.connect(**self._MYSQL_CONF) self.cursor = self.conn.cursor() self.redis = redis.Redis(**self._REDIS_CONF) def _connect_redis(self): """初始化或重建 Redis 客户端""" self.redis = redis.Redis(**self._REDIS_CONF) def reconnect_redis(self): """当捕获到 ConnectionError 时,重连 Redis""" try: self._connect_redis() except Exception as e: print("[Redis reconnect error]", e) time.sleep(2) def push_record(self, data: dict): raw = json.dumps(data, ensure_ascii=False) try: self.redis.lpush(self.record_list_key, raw) except redis.exceptions.ConnectionError: self.reconnect_redis() self.redis.lpush(self.record_list_key, raw) def fetch_records(self, count: int = 100): try: raws = self.redis.lpop(self.record_list_key, count) except TypeError: raws = [] for _ in range(count): item = self.redis.rpop(self.record_list_key) if item is None: break raws.append(item) except redis.exceptions.ConnectionError: self.reconnect_redis() return self.fetch_records(count) if not raws: return [] if isinstance(raws, str): raws = [raws] out = [] for raw in raws: try: data = json.loads(raw) out.append((raw, data)) except json.JSONDecodeError: continue return out def fetch_from_redis(self, count: int = 100, list_key: str = None): key = list_key try: raws = self.redis.lpop(key, count) except TypeError: raws = [] for _ in range(count): item = self.redis.rpop(key) if item is None: break raws.append(item) except redis.exceptions.ConnectionError as e: print("[Redis pop error]", e) self.reconnect_redis() return [] if not raws: return [] if isinstance(raws, str): raws = [raws] out = [] for raw in raws: try: out.append((raw, json.loads(raw))) except json.JSONDecodeError: continue return out def rollback_records(self, raws): if isinstance(raws, str): raws = [raws] self.redis.lpush(self.record_list_key, *raws) def rollback_urgent(self, raws): """写库失败或加急任务处理失败时,把原始 JSON 退回 urgent 列表""" if isinstance(raws, str): raws = [raws] try: self.redis.lpush(self.urgent_list_key, *raws) except redis.exceptions.ConnectionError as e: print("[Redis urgent rollback error]", e) self.reconnect_redis() self.redis.lpush(self.urgent_list_key, *raws) def item_keyword(self, count: int = 100): try: urgent_items = self.fetch_from_redis(count, list_key=self.urgent_list_key) except Exception as e: print("[Redis urgent pop error]", e) self.reconnect_redis() urgent_items = [] if urgent_items: return urgent_items, 1 try: items = self.fetch_from_redis(count, list_key=self.list_key) except Exception as e: print("[Redis normal pop error]", e) self.reconnect_redis() return [], 0 return items, 2 def rollback(self, payloads): if not payloads: return if isinstance(payloads, str): payloads = [payloads] self.redis.rpush(self.list_key, *payloads) print(f"[回滚] 已退回 {len(payloads)} 条") def upsert_video(self, data: dict): """ 1) 插入到 sh_dm_video_op_v2 2) DELETE sh_dm_video_v2 WHERE rn = … AND v_xid = … 3) INSERT INTO sh_dm_video_v2 (…) VALUES (…) """ # 保底字段 data.setdefault("a_id", 0) data.setdefault("history_status", "") data.setdefault("is_repeat", 3) data["sort"] = data.get("index", 0) try: sql_op = """ INSERT INTO sh_dm_video_op_v2 ( v_id, v_xid, a_id, level, name_title, keyword, rn, history_status, is_repeat, sort, createtime, updatetime, batch, machine ) VALUES ( %(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s ) """ self.cursor.execute(sql_op, data) sql_del = """ DELETE FROM sh_dm_video_v2 WHERE rn = %(rn)s AND v_xid = %(v_xid)s """ self.cursor.execute(sql_del, data) sql_ins = """ INSERT INTO sh_dm_video_v2 ( v_id, v_xid, rn, v_name, title, link, is_piracy, edition, duration, watch_number, follow_number, video_number, public_time, cover_pic, sort, u_xid, u_id, u_pic, u_name, status, createtime, updatetime ) VALUES ( %(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, 3, '', %(duration)s, %(watch_number)s, %(fans)s, %(videos)s, %(create_time)s, %(cover_pic)s, %(sort)s, %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP() ) """ self.cursor.execute(sql_ins, data) except Exception as e: # 打印错误并回滚 print("[数据库写入异常]", str(e)) print("[出错数据]", data) raise def flush(self): """批量执行完后手动提交。""" self.conn.commit() def close(self): self.cursor.close() self.conn.close() def get_proxy(self, region_code: str) -> str: """ 从 Redis 队列 proxy_queue: 弹出一个代理并返回。 如果队列为空,返回空字符串。 """ proxy = "" while True: key = f"proxy_queue:{region_code}" proxy = self.redis.lpop(key) if proxy is None: time.sleep(10) else: break return proxy