DailyMotion/multi_proxy_refill.py

121 lines
3.5 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import sys
import argparse
import redis
import requests
from requests import RequestException
from urllib.parse import quote
import DB
# —— 固定 Redis 连接配置 —— #
_REDIS_CONF = {
"host": "192.144.230.75",
"port": 6379,
"password": "qwert@$123!&",
"decode_responses": True,
}
# 阈值与批次参数
QUEUE_PREFIX = "proxy_queue"
LOW_WATERMARK = 200
REFILL_BATCH = 1000
SLEEP_INTERVAL = 10
# 第三方 API 参数
PROXY_API_URL = "http://api.proxy.roxlabs.io/getProxyIp"
ACCESS_ID = "2207189"
SIGN = "10099426b05c7119e9c4dbd6a7a0aa4e"
db = DB.DBVidcon()
def fetch_proxies2(region_code, n):
url = "http://" + region_code.lower() + "-pr.thordata.net:25000"
return [url for _ in range(n)]
def fetch_proxies(region_code, n):
"""
从第三方一次性请求 n 条代理,返回格式化列表
"""
params = {
"num": n,
"return_type": "json",
"lb": "4",
"sb": "",
"flow": "1",
"regions": region_code,
"protocol": "http"
}
try:
resp = requests.get(PROXY_API_URL, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
except (RequestException, ValueError):
time.sleep(1)
return fetch_proxies(region_code, n)
arr = data.get("data") or []
if not arr:
time.sleep(1)
return fetch_proxies(region_code, n)
result = []
for item in arr:
# user = quote(item["username"], safe="")
# pwd = quote(item["password"], safe="")
ip = item["ip"]
port = item["port"]
# result.append(f"http://{user}:{pwd}@{ip}:{port}")
result.append(f"http://{ip}:{port}")
return result
def refill_queue(r: redis.Redis, region_name: str, region_code: str,
low: int, batch: int):
key = f"{QUEUE_PREFIX}:{region_code}"
length = r.llen(key)
if length >= low:
return
to_fetch = batch - length
print(f"[{time.strftime('%H:%M:%S')}] {key} 长度 {length} < {low},一次性拉取 {to_fetch} 条…")
proxies = fetch_proxies2(region_code, to_fetch)
if proxies:
r.rpush(key, *proxies)
print(f"[{time.strftime('%H:%M:%S')}] 已入队 {len(proxies)} 条 → 新长度 {r.llen(key)}")
else:
print(f"[{time.strftime('%H:%M:%S')}] 拉取失败,无数据入队")
def main():
p = argparse.ArgumentParser(description="多地区代理队列自动补给")
p.add_argument("--low", "-l", type=int, default=LOW_WATERMARK,
help="低水位阈值,队列长度低于此值时触发补给")
p.add_argument("--batch", "-b", type=int, default=REFILL_BATCH,
help="单次补给目标条数")
p.add_argument("--sleep", "-s", type=int, default=SLEEP_INTERVAL,
help="检测间隔秒数")
args = p.parse_args()
r = redis.Redis(**_REDIS_CONF)
print(f"[*] 启动补给,监控前缀 `{QUEUE_PREFIX}:<code>` 列表,低于 {args.low} 条即补至 {args.batch}")
while True:
try:
proxies_address = db.get_proxy_agent_dict()
for region_name, region_code in proxies_address.items():
refill_queue(r, region_name, region_code, args.low, args.batch)
time.sleep(args.sleep)
except Exception as e:
msg = str(e).encode("utf-8", "ignore").decode()
print(f"[ERROR {time.strftime('%H:%M:%S')}] {msg}", file=sys.stderr)
time.sleep(args.sleep)
if __name__ == "__main__":
main()