135 lines
4.0 KiB
Python
135 lines
4.0 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import time
|
|
import sys
|
|
import argparse
|
|
import redis
|
|
import requests
|
|
from requests import RequestException
|
|
import DB
|
|
|
|
# —— 固定 Redis 连接配置 —— #
|
|
_REDIS_CONF = {
|
|
"host": "192.144.230.75",
|
|
"port": 6379,
|
|
"password": "qwert@$123!&",
|
|
"decode_responses": True,
|
|
}
|
|
|
|
# 阈值与批次参数
|
|
QUEUE_PREFIX = "proxy_queue"
|
|
LOW_WATERMARK = 200
|
|
REFILL_BATCH = 1000
|
|
SLEEP_INTERVAL = 10
|
|
|
|
# 第三方 API 参数
|
|
PROXY_API_URL = "http://api.proxy.roxlabs.io/getProxyIp"
|
|
ACCESS_ID = "2207189"
|
|
SIGN = "10099426b05c7119e9c4dbd6a7a0aa4e"
|
|
|
|
db = DB.DBVidcon()
|
|
|
|
def fetch_proxies3(region_code, n):
|
|
url = "https://get-ip.thordata.net/api"
|
|
params = {
|
|
"td-customer": "Thor48890515",
|
|
"sesstype": "2",
|
|
"number": n,
|
|
"country": region_code
|
|
}
|
|
try:
|
|
resp = requests.get(url, params=params)
|
|
except (RequestException, ValueError):
|
|
time.sleep(1)
|
|
return fetch_proxies3(region_code, n)
|
|
result = resp.text.split("\r\n")
|
|
return result
|
|
|
|
def fetch_proxies2(region_code, n):
|
|
url = "http://" + region_code.lower() + "-pr.thordata.net:25000"
|
|
return [url for _ in range(n)]
|
|
|
|
|
|
def fetch_proxies(region_code, n):
|
|
"""
|
|
从第三方一次性请求 n 条代理,返回格式化列表
|
|
"""
|
|
params = {
|
|
"num": n,
|
|
"return_type": "json",
|
|
"lb": "4",
|
|
"sb": "",
|
|
"flow": "1",
|
|
"regions": region_code,
|
|
"protocol": "http"
|
|
}
|
|
try:
|
|
resp = requests.get(PROXY_API_URL, params=params, timeout=10)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
except (RequestException, ValueError):
|
|
time.sleep(1)
|
|
return fetch_proxies(region_code, n)
|
|
|
|
arr = data.get("data") or []
|
|
if not arr:
|
|
time.sleep(1)
|
|
return fetch_proxies(region_code, n)
|
|
|
|
result = []
|
|
for item in arr:
|
|
# user = quote(item["username"], safe="")
|
|
# pwd = quote(item["password"], safe="")
|
|
ip = item["ip"]
|
|
port = item["port"]
|
|
# result.append(f"http://{user}:{pwd}@{ip}:{port}")
|
|
result.append(f"http://{ip}:{port}")
|
|
|
|
return result
|
|
|
|
|
|
def refill_queue(r: redis.Redis, region_name: str, region_code: str,
|
|
low: int, batch: int):
|
|
key = f"{QUEUE_PREFIX}:{region_code}"
|
|
length = r.llen(key)
|
|
if length >= low:
|
|
return
|
|
|
|
to_fetch = batch - length
|
|
print(f"[{time.strftime('%H:%M:%S')}] {key} 长度 {length} < {low},一次性拉取 {to_fetch} 条…")
|
|
proxies = fetch_proxies3(region_code, to_fetch)
|
|
if proxies:
|
|
r.rpush(key, *proxies)
|
|
print(f"[{time.strftime('%H:%M:%S')}] 已入队 {len(proxies)} 条 → 新长度 {r.llen(key)}")
|
|
else:
|
|
print(f"[{time.strftime('%H:%M:%S')}] 拉取失败,无数据入队")
|
|
|
|
|
|
def main():
|
|
p = argparse.ArgumentParser(description="多地区代理队列自动补给")
|
|
p.add_argument("--low", "-l", type=int, default=LOW_WATERMARK,
|
|
help="低水位阈值,队列长度低于此值时触发补给")
|
|
p.add_argument("--batch", "-b", type=int, default=REFILL_BATCH,
|
|
help="单次补给目标条数")
|
|
p.add_argument("--sleep", "-s", type=int, default=SLEEP_INTERVAL,
|
|
help="检测间隔秒数")
|
|
args = p.parse_args()
|
|
r = redis.Redis(**_REDIS_CONF)
|
|
print(f"[*] 启动补给,监控前缀 `{QUEUE_PREFIX}:<code>` 列表,低于 {args.low} 条即补至 {args.batch} 条")
|
|
|
|
while True:
|
|
try:
|
|
proxies_address = db.get_proxy_agent_dict()
|
|
for region_name, region_code in proxies_address.items():
|
|
refill_queue(r, region_name, region_code, args.low, args.batch)
|
|
time.sleep(args.sleep)
|
|
except Exception as e:
|
|
msg = str(e).encode("utf-8", "ignore").decode()
|
|
print(f"[ERROR {time.strftime('%H:%M:%S')}] {msg}", file=sys.stderr)
|
|
time.sleep(args.sleep)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# main()
|
|
fetch_proxies3('HK', 10) |