From 75cf8f73454b432434bc46136063a36946aaee07 Mon Sep 17 00:00:00 2001 From: Franklin-F Date: Sat, 17 May 2025 11:13:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=A7=94=E6=89=98?= =?UTF-8?q?=E4=BB=A3=E7=90=86=E6=A3=80=E7=B4=A2=E6=96=B9=E6=B3=95=E5=B9=B6?= =?UTF-8?q?=E9=87=8D=E6=9E=84=E4=BB=A3=E7=90=86=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DB.py | 18 +++++++++++++++++- main.py | 17 ++--------------- multi_proxy_refill.py | 32 ++++++++------------------------ 3 files changed, 27 insertions(+), 40 deletions(-) diff --git a/DB.py b/DB.py index 98ee0d1..e6827c5 100644 --- a/DB.py +++ b/DB.py @@ -104,6 +104,21 @@ class DBVidcon: continue return out + def get_proxy_agent_dict(self) -> dict: + sql = "SELECT rn, parameter FROM proxy_agent" + self.cursor.execute(sql) + rows = self.cursor.fetchall() + result = {rn: param for rn, param in rows} + self.close() + return result + + def get_proxy_parameter(self, rn: str) -> str: + sql = "SELECT parameter FROM proxy_agent WHERE rn = %s LIMIT 1" + self.cursor.execute(sql, (rn,)) + result = self.cursor.fetchone() + self.close() + return result[0] if result else None + def rollback_records(self, raws): if isinstance(raws, str): raws = [raws] @@ -240,11 +255,12 @@ class DBVidcon: self.cursor.close() self.conn.close() - def get_proxy(self, region_code: str) -> str: + def get_proxy(self, region_name: str) -> str: """ 从 Redis 队列 proxy_queue: 弹出一个代理并返回。 如果队列为空,阻塞 """ + region_code = self.get_proxy_parameter(region_name) proxy = "" while True: key = f"proxy_queue:{region_code}" diff --git a/main.py b/main.py index e7b51ed..b2fb401 100644 --- a/main.py +++ b/main.py @@ -11,18 +11,6 @@ from dateutil import parser as date_parser batch = str(int(time.time())) db = DBVidcon() -proxies_address = { - "印度尼西亚": "ID", - "马来": "MY", - "加拿大": "CA", - "台湾": "CN_city_TW", # "TW", # - "泰国": "TH", - "美国": "US", - "西班牙": "ES", - "韩国": "KR", - "香港": "CN_city_HK", # "HK", # - "越南": "VN", -} MACHINE_ID = None MAX_WORKERS = 10 @@ -658,8 +646,7 @@ def fetch_video_detail(task): "u_id": creator.get('id'), "u_xid": creator.get('xid'), "u_name": creator.get('name'), - "u_pic": avatar.get('url'), - "_region": proxies_address + "u_pic": avatar.get('url') } @@ -708,7 +695,7 @@ def integrate_data(): for index, (payload, kitem) in enumerate(keywords): try: global Gproxies - Gproxies = proxies_address[kitem['rn']] + Gproxies = kitem['rn'] v_list = get_searchInfo(kitem['keyword']) if not v_list: diff --git a/multi_proxy_refill.py b/multi_proxy_refill.py index 7e1569f..2e53a71 100644 --- a/multi_proxy_refill.py +++ b/multi_proxy_refill.py @@ -8,6 +8,7 @@ import redis import requests from requests import RequestException from urllib.parse import quote +import DB # —— 固定 Redis 连接配置 —— # _REDIS_CONF = { @@ -23,28 +24,14 @@ LOW_WATERMARK = 200 REFILL_BATCH = 1000 SLEEP_INTERVAL = 10 -# 区域映射 -PROXIES_ADDRESS = { - "印度尼西亚": "ID", - "马来": "MY", - "加拿大": "CA", - "台湾": "CN_city_TW", #"TW", # - "泰国": "TH", - "美国": "US", - "西班牙": "ES", - "韩国": "KR", - "香港": "CN_city_HK", #"HK", # - "越南": "VN", -} - - # 第三方 API 参数 PROXY_API_URL = "https://www.kookeey.net/pickdynamicips" ACCESS_ID = "2207189" SIGN = "10099426b05c7119e9c4dbd6a7a0aa4e" +db = DB.DBVidcon() -def fetch_proxies(region_name, n): +def fetch_proxies(region_code, n): """ 从第三方一次性请求 n 条代理,返回格式化列表 """ @@ -54,7 +41,7 @@ def fetch_proxies(region_name, n): "n": str(n), "p": "http", "gate": "sea", - "g": region_name, + "g": region_code, "r": "0", "type": "json", "sign": SIGN, @@ -67,12 +54,12 @@ def fetch_proxies(region_name, n): data = resp.json() except (RequestException, ValueError): time.sleep(1) - return fetch_proxies(region_name, n) + return fetch_proxies(region_code, n) arr = data.get("data") or [] if not arr: time.sleep(1) - return fetch_proxies(region_name, n) + return fetch_proxies(region_code, n) result = [] for item in arr: @@ -87,9 +74,6 @@ def fetch_proxies(region_name, n): def refill_queue(r: redis.Redis, region_name: str, region_code: str, low: int, batch: int): - """ - 如果 region 对应的队列长度 < low,就一次性补 batch 条 - """ key = f"{QUEUE_PREFIX}:{region_code}" length = r.llen(key) if length >= low: @@ -114,13 +98,13 @@ def main(): p.add_argument("--sleep", "-s", type=int, default=SLEEP_INTERVAL, help="检测间隔秒数") args = p.parse_args() - r = redis.Redis(**_REDIS_CONF) print(f"[*] 启动补给,监控前缀 `{QUEUE_PREFIX}:` 列表,低于 {args.low} 条即补至 {args.batch} 条") while True: try: - for region_name, region_code in PROXIES_ADDRESS.items(): + proxies_address = db.get_proxy_agent_dict() + for region_name, region_code in proxies_address.items(): refill_queue(r, region_name, region_code, args.low, args.batch) time.sleep(args.sleep) except Exception as e: