149 lines
4.8 KiB
Python
149 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import time
|
|
import sys
|
|
import argparse
|
|
from urllib.parse import quote
|
|
import redis
|
|
import requests
|
|
from requests import RequestException
|
|
import DB
|
|
|
|
# —— 固定 Redis 连接配置 —— #
|
|
_REDIS_CONF = {
|
|
"host": "192.144.230.75",
|
|
"port": 6379,
|
|
"password": "qwert@$123!&",
|
|
"decode_responses": True,
|
|
}
|
|
|
|
# 阈值与批次参数
|
|
QUEUE_PREFIX = "proxy_queue"
|
|
LOW_WATERMARK = 200
|
|
REFILL_BATCH = 1000
|
|
SLEEP_INTERVAL = 10
|
|
|
|
db = DB.DBVidcon()
|
|
|
|
def fetch_proxies3(region_code, n):
|
|
url = "https://get-ip.thordata.net/api"
|
|
params = {
|
|
"td-customer": "Thor48890515",
|
|
"sesstype": "2",
|
|
"number": n,
|
|
"country": region_code
|
|
}
|
|
try:
|
|
resp = requests.get(url, params=params)
|
|
except (RequestException, ValueError):
|
|
time.sleep(1)
|
|
return fetch_proxies3(region_code, n)
|
|
result = ["http://" + item for item in resp.text.split("\r\n")]
|
|
return result
|
|
|
|
def fetch_proxies2(region_code, n):
|
|
url = "http://" + region_code.lower() + "-pr.thordata.net:25000"
|
|
return [url for _ in range(n)]
|
|
|
|
|
|
def fetch_proxies(region_code, n):
|
|
"""
|
|
从第三方一次性请求 n 条代理,返回格式化列表
|
|
"""
|
|
headers = {
|
|
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
|
|
"accept-language": "zh-CN,zh;q=0.9",
|
|
"priority": "u=0, i",
|
|
"sec-ch-ua": "\"Chromium\";v=\"136\", \"Microsoft Edge\";v=\"136\", \"Not.A/Brand\";v=\"99\"",
|
|
"sec-ch-ua-mobile": "?0",
|
|
"sec-ch-ua-platform": "\"Windows\"",
|
|
"sec-fetch-dest": "document",
|
|
"sec-fetch-mode": "navigate",
|
|
"sec-fetch-site": "none",
|
|
"sec-fetch-user": "?1",
|
|
"upgrade-insecure-requests": "1",
|
|
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0"
|
|
}
|
|
url = "https://www.kookeey.com/pickdynamicips"
|
|
params = {
|
|
"auth": "pwd",
|
|
"format": "1",
|
|
"n": n,
|
|
"p": "http",
|
|
"gate": "hk",
|
|
"g": region_code,
|
|
"r": "0",
|
|
"type": "json",
|
|
"sign": "10099426b05c7119e9c4dbd6a7a0aa4e",
|
|
"accessid": "2207189",
|
|
"dl": ","
|
|
}
|
|
try:
|
|
resp = requests.get(url,headers=headers, params=params, timeout=10)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
except (RequestException, ValueError):
|
|
time.sleep(1)
|
|
return fetch_proxies(region_code, n)
|
|
|
|
arr = data.get("data") or []
|
|
if not arr:
|
|
time.sleep(1)
|
|
return fetch_proxies(region_code, n)
|
|
|
|
result = []
|
|
for item in arr:
|
|
user = quote(item["username"], safe="")
|
|
pwd = quote(item["password"], safe="")
|
|
ip = item["ip"]
|
|
port = item["port"]
|
|
result.append(f"http://{user}:{pwd}@{ip}:{port}")
|
|
# result.append(f"http://{ip}:{port}")
|
|
|
|
return result
|
|
|
|
|
|
def refill_queue(r: redis.Redis, region_name: str, region_code: str,
|
|
low: int, batch: int):
|
|
key = f"{QUEUE_PREFIX}:{region_code}"
|
|
length = r.llen(key)
|
|
if length >= low:
|
|
return
|
|
|
|
to_fetch = batch - length
|
|
print(f"[{time.strftime('%H:%M:%S')}] {key} 长度 {length} < {low},一次性拉取 {to_fetch} 条…")
|
|
proxies = fetch_proxies3(region_code, to_fetch)
|
|
if proxies:
|
|
r.rpush(key, *proxies)
|
|
print(f"[{time.strftime('%H:%M:%S')}] 已入队 {len(proxies)} 条 → 新长度 {r.llen(key)}")
|
|
else:
|
|
print(f"[{time.strftime('%H:%M:%S')}] 拉取失败,无数据入队")
|
|
|
|
|
|
def main():
|
|
p = argparse.ArgumentParser(description="多地区代理队列自动补给")
|
|
p.add_argument("--low", "-l", type=int, default=LOW_WATERMARK,
|
|
help="低水位阈值,队列长度低于此值时触发补给")
|
|
p.add_argument("--batch", "-b", type=int, default=REFILL_BATCH,
|
|
help="单次补给目标条数")
|
|
p.add_argument("--sleep", "-s", type=int, default=SLEEP_INTERVAL,
|
|
help="检测间隔秒数")
|
|
args = p.parse_args()
|
|
r = redis.Redis(**_REDIS_CONF)
|
|
print(f"[*] 启动补给,监控前缀 `{QUEUE_PREFIX}:<code>` 列表,低于 {args.low} 条即补至 {args.batch} 条")
|
|
|
|
while True:
|
|
try:
|
|
proxies_address = db.get_proxy_agent_dict()
|
|
for region_name, region_code in proxies_address.items():
|
|
refill_queue(r, region_name, region_code, args.low, args.batch)
|
|
time.sleep(args.sleep)
|
|
except Exception as e:
|
|
msg = str(e).encode("utf-8", "ignore").decode()
|
|
print(f"[ERROR {time.strftime('%H:%M:%S')}] {msg}", file=sys.stderr)
|
|
time.sleep(args.sleep)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |