diff --git a/DB.py b/DB.py index 7154462..89169cf 100644 --- a/DB.py +++ b/DB.py @@ -13,7 +13,6 @@ from sqlalchemy import ( from sqlalchemy.dialects.mysql import insert as mysql_insert from logger import logger - MYSQL_URL = ( "mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ" "@192.144.230.75:3306/db_vidcon?charset=utf8mb4" @@ -213,13 +212,23 @@ class DBVidcon: self.redis.lpush(self.l2_list_key, *raws) logger.info(f"[写入l2] 已推入 {len(raws)} 条") - @mysql_retry() + @redis_retry(max_retries=3) def get_proxy_agent_dict(self) -> dict: - sql = "SELECT rn, parameter FROM proxy_agent" - self.cursor.execute(sql) - rows = self.cursor.fetchall() - result = {row['rn']: row['parameter'] for row in rows} - return result + try: + meta_json = self.redis.hget("proxy_config", "meta") + except Exception as e: + logger.info("[Redis get_proxy_parameter error]", exc_info=e) + self.reconnect_redis() + return {} + + if not meta_json: + return {} + + try: + return json.loads(meta_json) + except (json.JSONDecodeError, TypeError) as e: + logger.warning("[Proxy meta JSON decode error]", exc_info=e) + return {} @mysql_retry() def get_proxy_parameter(self, rn: str) -> str: @@ -424,19 +433,16 @@ class DBVidcon: @redis_retry(max_retries=3) def get_proxy(self, region_code: str) -> str: - """ - 从 Redis 队列 proxy_queue: 弹出一个代理并返回。 - 如果队列为空,阻塞 - """ - proxy = "" - while True: - key = f"proxy_queue:{region_code}" - proxy = self.redis.lpop(key) - if proxy is None: - time.sleep(10) - else: - break - return proxy + list_json_str = self.redis.hget("proxy_config", "list") or "[]" + conf_str = self.redis.hget("proxy_config", "conf") or "0" + + try: + proxies = json.loads(list_json_str) + idx = int(conf_str) + entry = proxies[idx] + return entry.get("data", {}).get(region_code, "") + except (ValueError, IndexError, KeyError, json.JSONDecodeError): + return "" @redis_retry(max_retries=3) def queues_empty(self) -> bool: @@ -590,7 +596,7 @@ class DBSA: logger.info("[Redis 回退失败]", re) @classmethod - def update_video_stats(cls, locator:dict, stats:dict) -> int: + def update_video_stats(cls, locator: dict, stats: dict) -> int: """ 立即更新 sh_dm_video_v2 表中的统计字段。n :param locator: 用于定位行的字典,必须包含: v_xid, rn @@ -619,7 +625,7 @@ class DBSA: return result.rowcount @classmethod - def update_video_stats_async(cls, locator:dict, stats:dict) -> None: + def update_video_stats_async(cls, locator: dict, stats: dict) -> None: """ 异步更新 sh_dm_video_v2 表中的统计字段,立即返回,不阻塞调用线程。 """ @@ -628,4 +634,4 @@ class DBSA: args=(locator, stats), daemon=True ) - thread.start() \ No newline at end of file + thread.start() diff --git a/main.py b/main.py index 5d1b86a..8e1df32 100644 --- a/main.py +++ b/main.py @@ -357,6 +357,8 @@ executor = concurrent.futures.ThreadPoolExecutor(MAX_WORKERS) def integrate_data_parallel(): while True: + global proxiesdict + proxiesdict = db.get_proxy_agent_dict() tasks, flag = db.item_keyword() if not tasks: time.sleep(10)