fix: 更新代理检索方法,以使用Redis获得更好的可靠性和错误处理

This commit is contained in:
晓丰 2025-05-30 20:42:39 +08:00
parent aab52002e9
commit 9648f8b3e9
2 changed files with 31 additions and 23 deletions

46
DB.py
View File

@ -13,7 +13,6 @@ from sqlalchemy import (
from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.dialects.mysql import insert as mysql_insert
from logger import logger from logger import logger
MYSQL_URL = ( MYSQL_URL = (
"mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ" "mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ"
"@192.144.230.75:3306/db_vidcon?charset=utf8mb4" "@192.144.230.75:3306/db_vidcon?charset=utf8mb4"
@ -213,13 +212,23 @@ class DBVidcon:
self.redis.lpush(self.l2_list_key, *raws) self.redis.lpush(self.l2_list_key, *raws)
logger.info(f"[写入l2] 已推入 {len(raws)}") logger.info(f"[写入l2] 已推入 {len(raws)}")
@mysql_retry() @redis_retry(max_retries=3)
def get_proxy_agent_dict(self) -> dict: def get_proxy_agent_dict(self) -> dict:
sql = "SELECT rn, parameter FROM proxy_agent" try:
self.cursor.execute(sql) meta_json = self.redis.hget("proxy_config", "meta")
rows = self.cursor.fetchall() except Exception as e:
result = {row['rn']: row['parameter'] for row in rows} logger.info("[Redis get_proxy_parameter error]", exc_info=e)
return result self.reconnect_redis()
return {}
if not meta_json:
return {}
try:
return json.loads(meta_json)
except (json.JSONDecodeError, TypeError) as e:
logger.warning("[Proxy meta JSON decode error]", exc_info=e)
return {}
@mysql_retry() @mysql_retry()
def get_proxy_parameter(self, rn: str) -> str: def get_proxy_parameter(self, rn: str) -> str:
@ -424,19 +433,16 @@ class DBVidcon:
@redis_retry(max_retries=3) @redis_retry(max_retries=3)
def get_proxy(self, region_code: str) -> str: def get_proxy(self, region_code: str) -> str:
""" list_json_str = self.redis.hget("proxy_config", "list") or "[]"
Redis 队列 proxy_queue:<region_code> 弹出一个代理并返回 conf_str = self.redis.hget("proxy_config", "conf") or "0"
如果队列为空阻塞
""" try:
proxy = "" proxies = json.loads(list_json_str)
while True: idx = int(conf_str)
key = f"proxy_queue:{region_code}" entry = proxies[idx]
proxy = self.redis.lpop(key) return entry.get("data", {}).get(region_code, "")
if proxy is None: except (ValueError, IndexError, KeyError, json.JSONDecodeError):
time.sleep(10) return ""
else:
break
return proxy
@redis_retry(max_retries=3) @redis_retry(max_retries=3)
def queues_empty(self) -> bool: def queues_empty(self) -> bool:

View File

@ -357,6 +357,8 @@ executor = concurrent.futures.ThreadPoolExecutor(MAX_WORKERS)
def integrate_data_parallel(): def integrate_data_parallel():
while True: while True:
global proxiesdict
proxiesdict = db.get_proxy_agent_dict()
tasks, flag = db.item_keyword() tasks, flag = db.item_keyword()
if not tasks: if not tasks:
time.sleep(10) time.sleep(10)