134 lines
3.8 KiB
Python
134 lines
3.8 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import time
|
||
import sys
|
||
import argparse
|
||
import redis
|
||
import requests
|
||
from requests import RequestException
|
||
from urllib.parse import quote
|
||
|
||
# —— 固定 Redis 连接配置 —— #
|
||
_REDIS_CONF = {
|
||
"host": "127.0.0.1",
|
||
"port": 6379,
|
||
"password": "qwert@$123!&",
|
||
"decode_responses": True,
|
||
}
|
||
|
||
# 阈值与批次参数
|
||
QUEUE_PREFIX = "proxy_queue"
|
||
LOW_WATERMARK = 200
|
||
REFILL_BATCH = 1000
|
||
SLEEP_INTERVAL = 10
|
||
|
||
# 区域映射
|
||
PROXIES_ADDRESS = {
|
||
"印度尼西亚": "ID",
|
||
"马来": "MY",
|
||
"加拿大": "CA",
|
||
"台湾": "CN_city_TW", #"TW", #
|
||
"泰国": "TH",
|
||
"美国": "US",
|
||
"西班牙": "ES",
|
||
"韩国": "KR",
|
||
"香港": "CN_city_HK", #"HK", #
|
||
"越南": "VN",
|
||
}
|
||
|
||
|
||
# 第三方 API 参数
|
||
PROXY_API_URL = "https://www.kookeey.com/pickdynamicips"
|
||
ACCESS_ID = "2207189"
|
||
SIGN = "10099426b05c7119e9c4dbd6a7a0aa4e"
|
||
|
||
|
||
def fetch_proxies(region_name, n):
|
||
"""
|
||
从第三方一次性请求 n 条代理,返回格式化列表
|
||
"""
|
||
params = {
|
||
"auth": "pwd",
|
||
"format": "1",
|
||
"n": str(n),
|
||
"p": "http",
|
||
"gate": "sea",
|
||
"g": region_name,
|
||
"r": "0",
|
||
"type": "json",
|
||
"sign": SIGN,
|
||
"accessid": ACCESS_ID,
|
||
"dl": ",",
|
||
}
|
||
try:
|
||
resp = requests.get(PROXY_API_URL, params=params, timeout=10)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
except (RequestException, ValueError):
|
||
time.sleep(1)
|
||
return fetch_proxies(region_name, n)
|
||
|
||
arr = data.get("data") or []
|
||
if not arr:
|
||
time.sleep(1)
|
||
return fetch_proxies(region_name, n)
|
||
|
||
result = []
|
||
for item in arr:
|
||
user = quote(item["username"], safe="")
|
||
pwd = quote(item["password"], safe="")
|
||
ip = item["ip"]
|
||
port = item["port"]
|
||
result.append(f"http://{user}:{pwd}@{ip}:{port}")
|
||
|
||
return result
|
||
|
||
|
||
def refill_queue(r: redis.Redis, region_name: str, region_code: str,
|
||
low: int, batch: int):
|
||
"""
|
||
如果 region 对应的队列长度 < low,就一次性补 batch 条
|
||
"""
|
||
key = f"{QUEUE_PREFIX}:{region_code}"
|
||
length = r.llen(key)
|
||
if length >= low:
|
||
return
|
||
|
||
to_fetch = batch - length
|
||
print(f"[{time.strftime('%H:%M:%S')}] {key} 长度 {length} < {low},一次性拉取 {to_fetch} 条…")
|
||
proxies = fetch_proxies(region_code, to_fetch)
|
||
if proxies:
|
||
r.rpush(key, *proxies)
|
||
print(f"[{time.strftime('%H:%M:%S')}] 已入队 {len(proxies)} 条 → 新长度 {r.llen(key)}")
|
||
else:
|
||
print(f"[{time.strftime('%H:%M:%S')}] 拉取失败,无数据入队")
|
||
|
||
|
||
def main():
|
||
p = argparse.ArgumentParser(description="多地区代理队列自动补给")
|
||
p.add_argument("--low", "-l", type=int, default=LOW_WATERMARK,
|
||
help="低水位阈值,队列长度低于此值时触发补给")
|
||
p.add_argument("--batch", "-b", type=int, default=REFILL_BATCH,
|
||
help="单次补给目标条数")
|
||
p.add_argument("--sleep", "-s", type=int, default=SLEEP_INTERVAL,
|
||
help="检测间隔秒数")
|
||
args = p.parse_args()
|
||
|
||
r = redis.Redis(**_REDIS_CONF)
|
||
print(f"[*] 启动补给,监控前缀 `{QUEUE_PREFIX}:<code>` 列表,低于 {args.low} 条即补至 {args.batch} 条")
|
||
|
||
while True:
|
||
try:
|
||
for region_name, region_code in PROXIES_ADDRESS.items():
|
||
refill_queue(r, region_name, region_code, args.low, args.batch)
|
||
time.sleep(args.sleep)
|
||
except Exception as e:
|
||
msg = str(e).encode("utf-8", "ignore").decode()
|
||
print(f"[ERROR {time.strftime('%H:%M:%S')}] {msg}", file=sys.stderr)
|
||
time.sleep(args.sleep)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|