DailyMotion/multi_proxy_refill.py

207 lines
6.5 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import sys
import argparse
from urllib.parse import quote
import redis
import requests
from requests import RequestException
import DB
# —— 固定 Redis 连接配置 —— #
_REDIS_CONF = {
"host": "192.144.230.75",
"port": 6379,
"password": "qwert@$123!&",
"decode_responses": True,
}
# 阈值与批次参数
QUEUE_PREFIX = "proxy_queue"
LOW_WATERMARK = 200
REFILL_BATCH = 1000
SLEEP_INTERVAL = 10
db = DB.DBVidcon()
def fetch_proxies3(region_code, n):
url = "https://get-ip.thordata.net/api"
params = {
"td-customer": "Thor48890515",
"sesstype": "2",
"number": n,
"country": region_code
}
try:
resp = requests.get(url, params=params)
except (RequestException, ValueError):
time.sleep(1)
return fetch_proxies3(region_code, n)
result = ["http://" + item for item in resp.text.split("\r\n")]
return result
def fetch_proxies2(region_code, n):
url = f"http://api.ipipgo.com/ip?cty={region_code}&c={n}&pt=1&ft=json&pat=\n&rep=1&key=625ce417"
try:
res = requests.get(url)
data_json = res.json()
print(data_json)
except (RequestException, ValueError):
time.sleep(1)
return fetch_proxies1(region_code, n)
arr = data_json["data"] or []
if not arr:
time.sleep(1)
return fetch_proxies1(region_code, n)
result = []
for item in arr:
ip = item["ip"]
port = item["port"]
result.append(f"http://{ip}:{port}")
return result
def fetch_proxies4(region_code, n):
url = f"http://api.proxy.roxlabs.io/getProxyIp?num={n}&return_type=json&lb=1&sb=&flow=1&regions={region_code}&protocol=socks5"
try:
res = requests.get(url)
data_json = res.json()
except (RequestException, ValueError):
time.sleep(1)
return fetch_proxies1(region_code, n)
arr = data_json["data"] or []
if not arr:
time.sleep(1)
return fetch_proxies1(region_code, n)
result = []
for item in arr:
ip = item["ip"]
port = item["port"]
result.append(f"socks5h://{ip}:{port}")
return result
def fetch_proxies1(region_code, n):
url = f"http://api.proxy.ipidea.io/getProxyIp?num={n}&return_type=json&lb=1&sb=0&flow=1&regions={region_code}&protocol=http"
try:
res = requests.get(url)
data_json = res.json()
except (RequestException, ValueError):
time.sleep(1)
return fetch_proxies1(region_code, n)
arr = data_json["data"] or []
if not arr:
time.sleep(1)
return fetch_proxies1(region_code, n)
result = []
for item in arr:
ip = item["ip"]
port = item["port"]
result.append(f"http://{ip}:{port}")
return result
def fetch_proxies(region_code, n):
"""
从第三方一次性请求 n 条代理,返回格式化列表
"""
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "zh-CN,zh;q=0.9",
"priority": "u=0, i",
"sec-ch-ua": "\"Chromium\";v=\"136\", \"Microsoft Edge\";v=\"136\", \"Not.A/Brand\";v=\"99\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0"
}
url = "https://www.kookeey.net/pickdynamicips"
params = {
"auth": "pwd",
"format": "1",
"n": n,
"p": "http",
"gate": "hk",
"g": region_code,
"r": "0",
"type": "json",
"sign": "10099426b05c7119e9c4dbd6a7a0aa4e",
"accessid": "2207189",
"dl": ","
}
try:
resp = requests.get(url, headers=headers, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
except (RequestException, ValueError):
time.sleep(1)
return fetch_proxies(region_code, n)
arr = data.get("data") or []
if not arr:
time.sleep(1)
return fetch_proxies1(region_code, n)
result = []
for item in arr:
user = quote(item["username"], safe="")
pwd = quote(item["password"], safe="")
ip = item["ip"]
port = item["port"]
result.append(f"http://{user}:{pwd}@{ip}:{port}")
# result.append(f"http://{ip}:{port}")
return result
def refill_queue(r: redis.Redis, region_name: str, region_code: str,
low: int, batch: int):
key = f"{QUEUE_PREFIX}:{region_code}"
length = r.llen(key)
if length >= low:
return
to_fetch = batch - length
print(f"[{time.strftime('%H:%M:%S')}] {key} 长度 {length} < {low},一次性拉取 {to_fetch} 条…")
proxies = fetch_proxies2(region_code, to_fetch)
if proxies:
r.rpush(key, *proxies)
print(f"[{time.strftime('%H:%M:%S')}] 已入队 {len(proxies)} 条 → 新长度 {r.llen(key)}")
else:
print(f"[{time.strftime('%H:%M:%S')}] 拉取失败,无数据入队")
def main():
p = argparse.ArgumentParser(description="多地区代理队列自动补给")
p.add_argument("--low", "-l", type=int, default=LOW_WATERMARK,
help="低水位阈值,队列长度低于此值时触发补给")
p.add_argument("--batch", "-b", type=int, default=REFILL_BATCH,
help="单次补给目标条数")
p.add_argument("--sleep", "-s", type=int, default=SLEEP_INTERVAL,
help="检测间隔秒数")
args = p.parse_args()
r = redis.Redis(**_REDIS_CONF)
print(f"[*] 启动补给,监控前缀 `{QUEUE_PREFIX}:<code>` 列表,低于 {args.low} 条即补至 {args.batch}")
while True:
try:
proxies_address = db.get_proxy_agent_dict()
for region_name, region_code in proxies_address.items():
refill_queue(r, region_name, region_code, args.low, args.batch)
time.sleep(args.sleep)
except Exception as e:
msg = str(e).encode("utf-8", "ignore").decode()
print(f"[ERROR {time.strftime('%H:%M:%S')}] {msg}", file=sys.stderr)
time.sleep(args.sleep)
if __name__ == "__main__":
main()
# print(fetch_proxies2('TW', 2))