DailyMotion/multi_proxy_refill.py
2025-05-17 01:54:29 +08:00

134 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import sys
import argparse
import redis
import requests
from requests import RequestException
from urllib.parse import quote
# —— 固定 Redis 连接配置 —— #
_REDIS_CONF = {
"host": "192.144.230.75",
"port": 6379,
"password": "qwert@$123!&",
"decode_responses": True,
}
# 阈值与批次参数
QUEUE_PREFIX = "proxy_queue"
LOW_WATERMARK = 200
REFILL_BATCH = 1000
SLEEP_INTERVAL = 10
# 区域映射
PROXIES_ADDRESS = {
"印度尼西亚": "ID",
"马来": "MY",
"加拿大": "CA",
"台湾": "CN_city_TW", #"TW", #
"泰国": "TH",
"美国": "US",
"西班牙": "ES",
"韩国": "KR",
"香港": "CN_city_HK", #"HK", #
"越南": "VN",
}
# 第三方 API 参数
PROXY_API_URL = "https://www.kookeey.net/pickdynamicips"
ACCESS_ID = "2207189"
SIGN = "10099426b05c7119e9c4dbd6a7a0aa4e"
def fetch_proxies(region_name, n):
"""
从第三方一次性请求 n 条代理,返回格式化列表
"""
params = {
"auth": "pwd",
"format": "1",
"n": str(n),
"p": "http",
"gate": "sea",
"g": region_name,
"r": "0",
"type": "json",
"sign": SIGN,
"accessid": ACCESS_ID,
"dl": ",",
}
try:
resp = requests.get(PROXY_API_URL, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
except (RequestException, ValueError):
time.sleep(1)
return fetch_proxies(region_name, n)
arr = data.get("data") or []
if not arr:
time.sleep(1)
return fetch_proxies(region_name, n)
result = []
for item in arr:
user = quote(item["username"], safe="")
pwd = quote(item["password"], safe="")
ip = item["ip"]
port = item["port"]
result.append(f"http://{user}:{pwd}@{ip}:{port}")
return result
def refill_queue(r: redis.Redis, region_name: str, region_code: str,
low: int, batch: int):
"""
如果 region 对应的队列长度 < low就一次性补 batch 条
"""
key = f"{QUEUE_PREFIX}:{region_code}"
length = r.llen(key)
if length >= low:
return
to_fetch = batch - length
print(f"[{time.strftime('%H:%M:%S')}] {key} 长度 {length} < {low},一次性拉取 {to_fetch} 条…")
proxies = fetch_proxies(region_code, to_fetch)
if proxies:
r.rpush(key, *proxies)
print(f"[{time.strftime('%H:%M:%S')}] 已入队 {len(proxies)} 条 → 新长度 {r.llen(key)}")
else:
print(f"[{time.strftime('%H:%M:%S')}] 拉取失败,无数据入队")
def main():
p = argparse.ArgumentParser(description="多地区代理队列自动补给")
p.add_argument("--low", "-l", type=int, default=LOW_WATERMARK,
help="低水位阈值,队列长度低于此值时触发补给")
p.add_argument("--batch", "-b", type=int, default=REFILL_BATCH,
help="单次补给目标条数")
p.add_argument("--sleep", "-s", type=int, default=SLEEP_INTERVAL,
help="检测间隔秒数")
args = p.parse_args()
r = redis.Redis(**_REDIS_CONF)
print(f"[*] 启动补给,监控前缀 `{QUEUE_PREFIX}:<code>` 列表,低于 {args.low} 条即补至 {args.batch}")
while True:
try:
for region_name, region_code in PROXIES_ADDRESS.items():
refill_queue(r, region_name, region_code, args.low, args.batch)
time.sleep(args.sleep)
except Exception as e:
msg = str(e).encode("utf-8", "ignore").decode()
print(f"[ERROR {time.strftime('%H:%M:%S')}] {msg}", file=sys.stderr)
time.sleep(args.sleep)
if __name__ == "__main__":
main()