import json import redis import pymysql import time class DBVidcon: _MYSQL_CONF = { "host": "192.144.230.75", # "127.0.0.1", # "port": 3306, # 3307, # "user": "db_vidcon", "password": "rexdK4fhCCiRE4BZ", "database": "db_vidcon", "charset": "utf8mb4", "cursorclass": pymysql.cursors.DictCursor, } _REDIS_CONF = { "host": "192.144.230.75", # "127.0.0.1", # "port": 6379, # 6380, # "password": "qwert@$123!&", "decode_responses": True, } def __init__(self): self.list_key = "video_kw_queue" self.error_list_key = "error_save_queue" self.urgent_list_key = "video_urgent_queue" self.conn = pymysql.connect(**self._MYSQL_CONF) self.cursor = self.conn.cursor() self.redis = redis.Redis(**self._REDIS_CONF) def _connect_redis(self): """初始化或重建 Redis 客户端""" self.redis = redis.Redis(**self._REDIS_CONF) def reconnect_redis(self): """当捕获到 ConnectionError 时,重连 Redis""" try: self._connect_redis() except Exception as e: print("[Redis reconnect error]", e) time.sleep(2) def push_record(self, data: dict): raw = json.dumps(data, ensure_ascii=False) try: self.redis.lpush(self.error_list_key, raw) except redis.exceptions.ConnectionError: self.reconnect_redis() self.redis.lpush(self.error_list_key, raw) def fetch_records(self, count: int = 100): try: raws = self.redis.lpop(self.record_list_key, count) except TypeError: raws = [] for _ in range(count): item = self.redis.rpop(self.record_list_key) if item is None: break raws.append(item) except redis.exceptions.ConnectionError: self.reconnect_redis() return self.fetch_records(count) if not raws: return [] if isinstance(raws, str): raws = [raws] out = [] for raw in raws: try: data = json.loads(raw) out.append((raw, data)) except json.JSONDecodeError: continue return out def fetch_from_redis(self, count: int = 100, list_key: str = None): key = list_key try: raws = self.redis.lpop(key, count) except TypeError: raws = [] for _ in range(count): item = self.redis.rpop(key) if item is None: break raws.append(item) except redis.exceptions.ConnectionError as e: print("[Redis pop error]", e) self.reconnect_redis() return [] if not raws: return [] if isinstance(raws, str): raws = [raws] out = [] for raw in raws: try: out.append((raw, json.loads(raw))) except json.JSONDecodeError: continue return out def get_proxy_agent_dict(self) -> dict: sql = "SELECT rn, parameter FROM proxy_agent" self.cursor.execute(sql) rows = self.cursor.fetchall() result = {row['rn']: row['parameter'] for row in rows} return result def get_proxy_parameter(self, rn: str) -> str: sql = "SELECT parameter FROM proxy_agent WHERE rn = %s LIMIT 1" self.cursor.execute(sql, (rn,)) result = self.cursor.fetchone() print(result) return result['parameter'] if result else None def rollback_records(self, raws): if isinstance(raws, str): raws = [raws] self.redis.lpush(self.urgent_list_key, *raws) def rollback_urgent(self, raws): if isinstance(raws, str): raws = [raws] try: self.redis.lpush(self.urgent_list_key, *raws) except redis.exceptions.ConnectionError as e: print("[Redis urgent rollback error]", e) self.reconnect_redis() self.redis.lpush(self.urgent_list_key, *raws) def item_keyword(self, count: int = 100): try: urgent_items = self.fetch_from_redis(count, list_key=self.urgent_list_key) except Exception as e: print("[Redis urgent pop error]", e) self.reconnect_redis() urgent_items = [] if urgent_items: return urgent_items, 1 try: items = self.fetch_from_redis(count, list_key=self.list_key) except Exception as e: print("[Redis normal pop error]", e) self.reconnect_redis() return [], 0 return items, 2 def rollback(self, payloads): if not payloads: return if isinstance(payloads, str): payloads = [payloads] self.redis.rpush(self.list_key, *payloads) print(f"[回滚] 已退回 {len(payloads)} 条") def upsert_video(self, data: dict): data.setdefault("a_id", 0) data.setdefault("history_status", "") data.setdefault("is_piracy", 3) data.setdefault("is_repeat", 3) data["sort"] = data.get("index", 0) max_retries = 1 # 除了第一次外,再重试一次 attempt = 0 while True: try: # 1) 先读 is_repeat select_repeat = """ SELECT is_repeat FROM sh_dm_video_v2 WHERE rn = %(rn)s AND v_xid = %(v_xid)s LIMIT 1 """ self.cursor.execute(select_repeat, data) row = self.cursor.fetchone() if row: data['is_repeat'] = row.get('is_repeat', 3) # 2) 插入到 op 表 sql_op = """ INSERT INTO sh_dm_video_op_v2 ( v_id, v_xid, a_id, level, name_title, keyword, rn, history_status, is_repeat, sort, createtime, updatetime, batch, machine ) VALUES ( %(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s ) """ self.cursor.execute(sql_op, data) # 3) 删除旧表行 sql_del = """ DELETE FROM sh_dm_video_v2 WHERE rn = %(rn)s AND v_xid = %(v_xid)s """ self.cursor.execute(sql_del, data) # 4) 插入新表 sql_ins = """ INSERT INTO sh_dm_video_v2 ( v_id, v_xid, rn, v_name, title, link, is_piracy, edition, duration, watch_number, follow_number, video_number, public_time, cover_pic, sort, u_xid, u_id, u_pic, u_name, status, createtime, updatetime, is_repeat ) VALUES ( %(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, %(is_piracy)s, '', %(duration)s, %(watch_number)s, %(fans)s, %(videos)s, %(create_time)s, %(cover_pic)s, %(sort)s, %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(is_repeat)s ) """ self.cursor.execute(sql_ins, data) break # 成功跳出重试循环 except Exception as e: # 回滚这次未提交的改动 self.conn.rollback() print("[数据库写入异常]", str(e)) print("[出错数据]:", data) if attempt < max_retries: attempt += 1 print(f"第 {attempt + 1} 次重试…") continue else: # 重试过后依然失败,推入 Redis 备用 print("重试失败,将数据写入 Redis 以便后续处理") self.push_record(data) print("[交由Redis处理]") break def flush(self): """批量执行完后手动提交。""" self.conn.commit() def close(self): self.cursor.close() self.conn.close() def get_proxy(self, region_code: str) -> str: """ 从 Redis 队列 proxy_queue: 弹出一个代理并返回。 如果队列为空,阻塞 """ proxy = "" while True: key = f"proxy_queue:{region_code}" proxy = self.redis.lpop(key) if proxy is None: time.sleep(10) else: break return proxy