From 9648f8b3e9374428c4b4d9b44d1ec4fc6a1e296f Mon Sep 17 00:00:00 2001 From: Franklin-F Date: Fri, 30 May 2025 20:42:39 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=9B=B4=E6=96=B0=E4=BB=A3=E7=90=86?= =?UTF-8?q?=E6=A3=80=E7=B4=A2=E6=96=B9=E6=B3=95=EF=BC=8C=E4=BB=A5=E4=BD=BF?= =?UTF-8?q?=E7=94=A8Redis=E8=8E=B7=E5=BE=97=E6=9B=B4=E5=A5=BD=E7=9A=84?= =?UTF-8?q?=E5=8F=AF=E9=9D=A0=E6=80=A7=E5=92=8C=E9=94=99=E8=AF=AF=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DB.py | 52 +++++++++++++++++++++++++++++----------------------- main.py | 2 ++ 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/DB.py b/DB.py index 7154462..89169cf 100644 --- a/DB.py +++ b/DB.py @@ -13,7 +13,6 @@ from sqlalchemy import ( from sqlalchemy.dialects.mysql import insert as mysql_insert from logger import logger - MYSQL_URL = ( "mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ" "@192.144.230.75:3306/db_vidcon?charset=utf8mb4" @@ -213,13 +212,23 @@ class DBVidcon: self.redis.lpush(self.l2_list_key, *raws) logger.info(f"[写入l2] 已推入 {len(raws)} 条") - @mysql_retry() + @redis_retry(max_retries=3) def get_proxy_agent_dict(self) -> dict: - sql = "SELECT rn, parameter FROM proxy_agent" - self.cursor.execute(sql) - rows = self.cursor.fetchall() - result = {row['rn']: row['parameter'] for row in rows} - return result + try: + meta_json = self.redis.hget("proxy_config", "meta") + except Exception as e: + logger.info("[Redis get_proxy_parameter error]", exc_info=e) + self.reconnect_redis() + return {} + + if not meta_json: + return {} + + try: + return json.loads(meta_json) + except (json.JSONDecodeError, TypeError) as e: + logger.warning("[Proxy meta JSON decode error]", exc_info=e) + return {} @mysql_retry() def get_proxy_parameter(self, rn: str) -> str: @@ -424,19 +433,16 @@ class DBVidcon: @redis_retry(max_retries=3) def get_proxy(self, region_code: str) -> str: - """ - 从 Redis 队列 proxy_queue: 弹出一个代理并返回。 - 如果队列为空,阻塞 - """ - proxy = "" - while True: - key = f"proxy_queue:{region_code}" - proxy = self.redis.lpop(key) - if proxy is None: - time.sleep(10) - else: - break - return proxy + list_json_str = self.redis.hget("proxy_config", "list") or "[]" + conf_str = self.redis.hget("proxy_config", "conf") or "0" + + try: + proxies = json.loads(list_json_str) + idx = int(conf_str) + entry = proxies[idx] + return entry.get("data", {}).get(region_code, "") + except (ValueError, IndexError, KeyError, json.JSONDecodeError): + return "" @redis_retry(max_retries=3) def queues_empty(self) -> bool: @@ -590,7 +596,7 @@ class DBSA: logger.info("[Redis 回退失败]", re) @classmethod - def update_video_stats(cls, locator:dict, stats:dict) -> int: + def update_video_stats(cls, locator: dict, stats: dict) -> int: """ 立即更新 sh_dm_video_v2 表中的统计字段。n :param locator: 用于定位行的字典,必须包含: v_xid, rn @@ -619,7 +625,7 @@ class DBSA: return result.rowcount @classmethod - def update_video_stats_async(cls, locator:dict, stats:dict) -> None: + def update_video_stats_async(cls, locator: dict, stats: dict) -> None: """ 异步更新 sh_dm_video_v2 表中的统计字段,立即返回,不阻塞调用线程。 """ @@ -628,4 +634,4 @@ class DBSA: args=(locator, stats), daemon=True ) - thread.start() \ No newline at end of file + thread.start() diff --git a/main.py b/main.py index 5d1b86a..8e1df32 100644 --- a/main.py +++ b/main.py @@ -357,6 +357,8 @@ executor = concurrent.futures.ThreadPoolExecutor(MAX_WORKERS) def integrate_data_parallel(): while True: + global proxiesdict + proxiesdict = db.get_proxy_agent_dict() tasks, flag = db.item_keyword() if not tasks: time.sleep(10)