feat: 初始化 DailyMotion 分布式爬虫项目

This commit is contained in:
晓丰 2025-05-16 22:16:26 +08:00
commit 14023c3b48
17 changed files with 1635 additions and 0 deletions

8
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

14
.idea/DailyMotion.iml generated Normal file
View File

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="Crawler" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>

View File

@ -0,0 +1,67 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="DuplicatedCode" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<Languages>
<language minSize="59" name="Python" />
</Languages>
</inspection_tool>
<inspection_tool class="PyPackageRequirementsInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredPackages">
<value>
<list size="30">
<item index="0" class="java.lang.String" itemvalue="protobuf" />
<item index="1" class="java.lang.String" itemvalue="tifffile" />
<item index="2" class="java.lang.String" itemvalue="cffi" />
<item index="3" class="java.lang.String" itemvalue="kiwisolver" />
<item index="4" class="java.lang.String" itemvalue="numpy" />
<item index="5" class="java.lang.String" itemvalue="RapidFuzz" />
<item index="6" class="java.lang.String" itemvalue="chardet" />
<item index="7" class="java.lang.String" itemvalue="fonttools" />
<item index="8" class="java.lang.String" itemvalue="onnxruntime" />
<item index="9" class="java.lang.String" itemvalue="charset-normalizer" />
<item index="10" class="java.lang.String" itemvalue="pythonnet" />
<item index="11" class="java.lang.String" itemvalue="pyreadline3" />
<item index="12" class="java.lang.String" itemvalue="soupsieve" />
<item index="13" class="java.lang.String" itemvalue="urllib3" />
<item index="14" class="java.lang.String" itemvalue="sympy" />
<item index="15" class="java.lang.String" itemvalue="idna" />
<item index="16" class="java.lang.String" itemvalue="greenlet" />
<item index="17" class="java.lang.String" itemvalue="playwright" />
<item index="18" class="java.lang.String" itemvalue="pyee" />
<item index="19" class="java.lang.String" itemvalue="pandas" />
<item index="20" class="java.lang.String" itemvalue="async-timeout" />
<item index="21" class="java.lang.String" itemvalue="pkg_resources" />
<item index="22" class="java.lang.String" itemvalue="zipp" />
<item index="23" class="java.lang.String" itemvalue="typing_extensions" />
<item index="24" class="java.lang.String" itemvalue="et-xmlfile" />
<item index="25" class="java.lang.String" itemvalue="pytz" />
<item index="26" class="java.lang.String" itemvalue="requests" />
<item index="27" class="java.lang.String" itemvalue="importlib-metadata" />
<item index="28" class="java.lang.String" itemvalue="redis" />
<item index="29" class="java.lang.String" itemvalue="openpyxl" />
</list>
</value>
</option>
</inspection_tool>
<inspection_tool class="PyPep8NamingInspection" enabled="true" level="WEAK WARNING" enabled_by_default="true">
<option name="ignoredErrors">
<list>
<option value="N806" />
</list>
</option>
</inspection_tool>
<inspection_tool class="PyUnresolvedReferencesInspection" enabled="true" level="WARNING" enabled_by_default="true">
<option name="ignoredIdentifiers">
<list>
<option value="Config.MapsAddress.np" />
</list>
</option>
</inspection_tool>
<inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
<option name="processCode" value="true" />
<option name="processLiterals" value="true" />
<option name="processComments" value="true" />
</inspection_tool>
</profile>
</component>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated Normal file
View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Crawler" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Crawler" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/DailyMotion.iml" filepath="$PROJECT_DIR$/.idea/DailyMotion.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

225
DB.py Normal file
View File

@ -0,0 +1,225 @@
import json
import redis
import pymysql
import time
class DBVidcon:
_MYSQL_CONF = {
"host": "192.144.230.75",
"port": 3306,
"user": "db_vidcon",
"password": "rexdK4fhCCiRE4BZ",
"database": "db_vidcon",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
_REDIS_CONF = {
"host": "192.144.230.75",
"port": 6379,
"password": "qwert@$123!&",
"decode_responses": True,
}
def __init__(self):
self.list_key = "video_kw_queue"
self.record_list_key = "record_kw_queue"
self.urgent_list_key = "video_urgent_queue"
self.conn = pymysql.connect(**self._MYSQL_CONF)
self.cursor = self.conn.cursor()
self.redis = redis.Redis(**self._REDIS_CONF)
def _connect_redis(self):
"""初始化或重建 Redis 客户端"""
self.redis = redis.Redis(**self._REDIS_CONF)
def reconnect_redis(self):
"""当捕获到 ConnectionError 时,重连 Redis"""
try:
self._connect_redis()
except Exception as e:
print("[Redis reconnect error]", e)
time.sleep(2)
def push_record(self, data: dict):
raw = json.dumps(data, ensure_ascii=False)
try:
self.redis.lpush(self.record_list_key, raw)
except redis.exceptions.ConnectionError:
self.reconnect_redis()
self.redis.lpush(self.record_list_key, raw)
def fetch_records(self, count: int = 100):
try:
raws = self.redis.lpop(self.record_list_key, count)
except TypeError:
raws = []
for _ in range(count):
item = self.redis.rpop(self.record_list_key)
if item is None:
break
raws.append(item)
except redis.exceptions.ConnectionError:
self.reconnect_redis()
return self.fetch_records(count)
if not raws:
return []
if isinstance(raws, str):
raws = [raws]
out = []
for raw in raws:
try:
data = json.loads(raw)
out.append((raw, data))
except json.JSONDecodeError:
continue
return out
def fetch_from_redis(self, count: int = 100, list_key: str = None):
key = list_key
try:
raws = self.redis.lpop(key, count)
except TypeError:
raws = []
for _ in range(count):
item = self.redis.rpop(key)
if item is None:
break
raws.append(item)
except redis.exceptions.ConnectionError as e:
print("[Redis pop error]", e)
self.reconnect_redis()
return []
if not raws:
return []
if isinstance(raws, str):
raws = [raws]
out = []
for raw in raws:
try:
out.append((raw, json.loads(raw)))
except json.JSONDecodeError:
continue
return out
def rollback_records(self, raws):
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.record_list_key, *raws)
def rollback_urgent(self, raws):
"""写库失败或加急任务处理失败时,把原始 JSON 退回 urgent 列表"""
if isinstance(raws, str):
raws = [raws]
try:
self.redis.lpush(self.urgent_list_key, *raws)
except redis.exceptions.ConnectionError as e:
print("[Redis urgent rollback error]", e)
self.reconnect_redis()
self.redis.lpush(self.urgent_list_key, *raws)
def item_keyword(self, count: int = 100):
try:
urgent_items = self.fetch_from_redis(count, list_key=self.urgent_list_key)
except Exception as e:
print("[Redis urgent pop error]", e)
self.reconnect_redis()
urgent_items = []
if urgent_items:
return urgent_items, 1
try:
items = self.fetch_from_redis(count, list_key=self.list_key)
except Exception as e:
print("[Redis normal pop error]", e)
self.reconnect_redis()
return [], 0
return items, 2
def rollback(self, payloads):
if not payloads:
return
if isinstance(payloads, str):
payloads = [payloads]
self.redis.rpush(self.list_key, *payloads)
print(f"[回滚] 已退回 {len(payloads)}")
def upsert_video(self, data: dict):
"""
1) 插入到 sh_dm_video_op_v2
2) DELETE sh_dm_video_v2 WHERE rn = AND v_xid =
3) INSERT INTO sh_dm_video_v2 () VALUES ()
"""
# 保底字段
data.setdefault("a_id", 0)
data.setdefault("history_status", "")
data.setdefault("is_repeat", 3)
data["sort"] = data.get("index", 0)
try:
sql_op = """
INSERT INTO sh_dm_video_op_v2 (
v_id, v_xid, a_id, level, name_title,
keyword, rn, history_status, is_repeat,
sort, createtime, updatetime, batch, machine
) VALUES (
%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s,
%(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s,
%(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s
)
"""
self.cursor.execute(sql_op, data)
sql_del = """
DELETE FROM sh_dm_video_v2
WHERE rn = %(rn)s
AND v_xid = %(v_xid)s
"""
self.cursor.execute(sql_del, data)
sql_ins = """
INSERT INTO sh_dm_video_v2 (
v_id, v_xid, rn, v_name, title, link,
is_piracy, edition, duration,
watch_number, follow_number, video_number,
public_time, cover_pic, sort,
u_xid, u_id, u_pic, u_name,
status, createtime, updatetime
) VALUES (
%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s,
3, '', %(duration)s,
%(watch_number)s, %(fans)s, %(videos)s,
%(create_time)s, %(cover_pic)s, %(sort)s,
%(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s,
1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP()
)
"""
self.cursor.execute(sql_ins, data)
except Exception as e:
# 打印错误并回滚
print("[数据库写入异常]", str(e))
print("[出错数据]", data)
raise
def flush(self):
"""批量执行完后手动提交。"""
self.conn.commit()
def close(self):
self.cursor.close()
self.conn.close()
def get_proxy(self, region_code: str) -> str:
"""
Redis 队列 proxy_queue:<region_code> 弹出一个代理并返回
如果队列为空返回空字符串
"""
proxy = ""
while True:
key = f"proxy_queue:{region_code}"
proxy = self.redis.lpop(key)
if proxy is None:
time.sleep(10)
else:
break
return proxy

37
README.md Normal file
View File

@ -0,0 +1,37 @@
# DailyMotion 分布式爬虫
### 文件说明
```text
consume_video_records.py 集中式存储 -弃用
DB.py 数据库相关
dump_keyword_title.py 任务下发脚本 -l [level] [-u 1:加急]
init_python.sh 初始化python环境
install_ql.sh 安装青龙面板
main.py 爬取脚本 -m [机器号] -w [并发数量]
multi_proxy_refill.py 自动代理池维护
mysql_to_xlsx.py 导出脚本 限制内陆运行
README.md
requirements.txt 统一包管理
```
青龙面板
| 地区 | IP | 用户 | 密码 | 青龙面板地址 |
| ------ | --------------- | ---- | ---------- | --------------------------- |
| *印尼 | 165.154.150.131 | root | 74jX4EScfC | http://165.154.150.131:5700 |
| 新加坡 | 152.32.220.250 | root | nWF9Cn}58A | http://152.32.220.250:5700 |
| 台北 | 123.58.197.91 | root | ZL%&f72)4i | http://123.58.197.91:5700 |
| 曼谷 | 165.154.181.211 | root | hi7Xu!MH6] | http://165.154.181.211:5700 |
| 胡志明 | 156.232.99.139 | root | Jg74M5cT6u | http://156.232.99.139:5700 |
| 首尔 | 152.32.243.118 | root | P7Ymwns&^8 | http://152.32.243.118:5700 |
#### 青龙面板统一密码:
用户: `admin`
密码: `P@ssWord`
- 印尼机器数据库`127.0.0.1` 端口+1

43
consume_video_records.py Normal file
View File

@ -0,0 +1,43 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import traceback
from DB import DBVidcon
def main():
db = DBVidcon()
try:
while True:
tasks = db.fetch_records(100)
if not tasks:
time.sleep(2)
continue
raws_to_rollback = []
for raw, data in tasks:
try:
db.upsert_video(data)
except Exception:
traceback.print_exc()
raws_to_rollback.append(raw)
try:
db.flush()
except Exception:
traceback.print_exc()
db.rollback_records([r for r, _ in tasks])
else:
if raws_to_rollback:
db.rollback_records(raws_to_rollback)
except KeyboardInterrupt:
print("Interrupted, exiting.")
except Exception:
traceback.print_exc()
finally:
db.close()
if __name__ == "__main__":
main()

84
dump_keyword_title.py Normal file
View File

@ -0,0 +1,84 @@
#!/usr/bin/env python3
"""
用法示例
--------
# 用默认 level=1
python3 dump_keyword_title.py
# 指定 level=3
python3 dump_keyword_title.py -l 3
"""
import json, time, pymysql, redis
import argparse
# ======= 配置区 =======
MYSQL_CONFIG = {
"host": "192.144.230.75", "port": 3306,
"user": "db_vidcon", "password": "rexdK4fhCCiRE4BZ",
"database": "db_vidcon", "charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor
}
REDIS_CONFIG = {
"host": "192.144.230.75", "port": 6379,
"password": "qwert@$123!&", "decode_responses": True
}
LIST_KEY = "video_kw_queue"
BATCH_SIZE = 1000
SQL = """
SELECT
k.keyword,
k.rn,
t.title AS v_name,
ANY_VALUE(t.level) AS level
FROM
sh_dm_keyword k
LEFT JOIN
sh_dm_title t ON k.title = t.title
WHERE
k.status = 1
AND t.status = 1
AND NOT EXISTS (
SELECT 1 FROM sh_dm_black_keyword b WHERE b.title = t.title
)
AND t.level = %s
GROUP BY k.keyword, k.rn
"""
def parse_args():
parser = argparse.ArgumentParser(
description="Dump keyword/title rows into Redis list."
)
parser.add_argument("-l", "--level", type=int, default=99,
help="value for t.level (default: 99)")
parser.add_argument("-u", "--urgent", type=int, default=0,
help="加急标记")
return parser.parse_args()
def main():
args = parse_args()
batch_ts = int(time.time())
conn = pymysql.connect(**MYSQL_CONFIG)
cur = conn.cursor()
cur.execute(SQL, (args.level,))
r = redis.Redis(**REDIS_CONFIG)
pipe = r.pipeline()
total = 0
start = time.time()
global LIST_KEY
if args.urgent == 1:
LIST_KEY = "video_urgent_queue"
for row in cur:
row["batch"] = batch_ts
pipe.lpush(LIST_KEY, json.dumps(row, ensure_ascii=False))
total += 1
if total % BATCH_SIZE == 0:
pipe.execute()
if pipe.command_stack:
pipe.execute()
print(f"✔ 推送 {total}level={args.level}, batch={batch_ts})→ Redis '{LIST_KEY}',耗时 {time.time()-start:.2f}s")
if __name__ == "__main__":
main()

30
init_python.sh Normal file
View File

@ -0,0 +1,30 @@
#!/usr/bin/env bash
set -euo pipefail
# 1. 切到项目根目录
cd /opt/ql/
apt-get install -y python3-venv
# 2. 创建 virtualenv如果不存在
if [ ! -d daily_com ]; then
echo "创建虚拟环境 daily_com..."
python3 -m venv daily_com
fi
# 3. 激活 venv
# shellcheck disable=SC1091
source daily_com/bin/activate
# 4. 升级 pip
echo "升级 pip..."
pip install --upgrade pip
# 5. 安装依赖
if [ -f requirements.txt ]; then
echo "安装依赖 requirements.txt..."
pip install -r requirements.txt
else
echo "⚠️ 未找到 requirements.txt跳过依赖安装"
fi

62
install_ql.sh Normal file
View File

@ -0,0 +1,62 @@
#!/usr/bin/env bash
# install_ql2183.sh install QingLong v2.18.3-3 (host mode)
set -euo pipefail
export DEBIAN_FRONTEND=noninteractive
######################## 可自定义区域 ########################
QL_VERSION="2.18.3-3"
NODE_MAJOR="20"
QL_DATA_DIR="/opt/ql/data" # 仅数据放 /opt/ql
QL_PORT="5700"
##############################################################
[[ $EUID -eq 0 ]] || { echo "请用 root 或 sudo 执行"; exit 1; }
echo "▶ 允许 APT 接受 Suite 变更..."
cat >/etc/apt/apt.conf.d/99releaseinfochange <<'EOF'
Acquire::AllowReleaseInfoChange::Suite "true";
Acquire::AllowReleaseInfoChange::Version "true";
EOF
echo "▶ 刷新索引 & 安装依赖..."
apt-get update -qq
apt-get install -y curl git python3 python3-pip build-essential >/dev/null
echo "▶ 安装 Node.js $NODE_MAJOR.x..."
curl -fsSL https://deb.nodesource.com/setup_${NODE_MAJOR}.x | bash - >/dev/null
apt-get install -y nodejs >/dev/null
echo "▶ 安装 pnpm 与 QingLong..."
npm install -g pnpm@8.3.1 node-pre-gyp >/dev/null
npm install -g "@whyour/qinglong@${QL_VERSION}" >/dev/null
# 🟢 自动解析 QingLong 源码真实路径
QL_DIR="$(npm root -g)/@whyour/qinglong"
echo "▶ QingLong 源码目录: $QL_DIR"
echo "▶ 创建数据目录 $QL_DATA_DIR..."
mkdir -p "$QL_DATA_DIR"
chown -R "$(logname)":"$(logname)" "$QL_DATA_DIR"
# 🟢 如无 .env 则拷贝一份
if [[ ! -f "$QL_DIR/.env" && -f "$QL_DIR/.env.example" ]]; then
cp "$QL_DIR/.env.example" "$QL_DIR/.env"
echo "▶ 已生成 $QL_DIR/.env"
fi
echo "▶ 写入环境变量文件..."
cat >/etc/profile.d/qinglong.sh <<EOF
# >>> QingLong env
export QL_DIR="$QL_DIR"
export QL_DATA_DIR="$QL_DATA_DIR"
export QL_PORT="$QL_PORT"
export PATH="\$PATH:$(npm root -g)/../.bin"
# <<< QingLong env
EOF
chmod 644 /etc/profile.d/qinglong.sh
# 🟢 立刻生效
. /etc/profile.d/qinglong.sh
echo -e "\n✅ 安装完成!可立即运行: qinglong init"
echo " 如果要开机自启,再执行 qinglong boot 或写 systemd 均可。"

796
main.py Normal file
View File

@ -0,0 +1,796 @@
from urllib.parse import quote
import argparse
import time
import uuid
import concurrent.futures
import requests
import datetime
from requests import RequestException
from DB import DBVidcon
from dateutil import parser as date_parser
batch = str(int(time.time()))
db = DBVidcon()
proxies_address = {
"印度尼西亚": "ID",
"马来": "MY",
"加拿大": "CA",
"台湾": "TW", #"CN_city_TW",
"泰国": "TH",
"美国": "US",
"西班牙": "ES",
"韩国": "KR",
"香港": "HK", #"CN_city_HK",
"越南": "VN",
}
MACHINE_ID = None
MAX_WORKERS = 10
def get_part_ids(part_num: int, take: int, offset: int = 0):
part_ids = list(range(offset, offset + take))
if max(part_ids) >= part_num:
raise ValueError(f"分片编号超出范围PART_IDS={part_ids} 超过 PART_NUM={part_num}")
next_offset = offset + take
if next_offset < part_num:
print(f"[提示] 下一台机器 offset 应该为: {next_offset}")
else:
print(f"[提示] 当前分片已经覆盖至末尾,无需更多机器")
return part_ids
def clean_dash_to_zero(val):
if val in ('-', '', None):
return 0
try:
return int(val)
except (ValueError, TypeError) as e:
print(f"[字段异常] val = {val}{str(e)}")
return 0
def format_create_time(timestr):
try:
dt = date_parser.isoparse(timestr)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
print(f"[时间格式错误] {timestr}{str(e)}")
return "1970-01-01 00:00:00"
def format_duration(seconds):
try:
seconds = int(seconds)
return f"{seconds // 60:02}:{seconds % 60:02}"
except Exception:
return "00:00"
headers1 = {
'Accept': '*/*, */*',
# 'Accept-Encoding': 'gzip, deflate, br',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
# 'Content-Length': '6237',
'Content-Type': 'application/json, application/json',
'Host': 'graphql.api.dailymotion.com',
'Origin': 'https://www.dailymotion.com',
'Referer': 'https://www.dailymotion.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
'X-DM-AppInfo-Id': 'com.dailymotion.neon',
'X-DM-AppInfo-Type': 'website',
'X-DM-AppInfo-Version': 'v2025-04-28T12:37:52.391Z',
'X-DM-Neon-SSR': '0',
'X-DM-Preferred-Country': 'us',
'accept-language': 'zh-CN',
'authorization': 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhaWQiOiJmMWEzNjJkMjg4YzFiOTgwOTljNyIsInJvbCI6ImNhbi1tYW5hZ2UtcGFydG5lcnMtcmVwb3J0cyBjYW4tcmVhZC12aWRlby1zdHJlYW1zIGNhbi1zcG9vZi1jb3VudHJ5IGNhbi1hZG9wdC11c2VycyBjYW4tcmVhZC1jbGFpbS1ydWxlcyBjYW4tbWFuYWdlLWNsYWltLXJ1bGVzIGNhbi1tYW5hZ2UtdXNlci1hbmFseXRpY3MgY2FuLXJlYWQtbXktdmlkZW8tc3RyZWFtcyBjYW4tZG93bmxvYWQtbXktdmlkZW9zIGFjdC1hcyBhbGxzY29wZXMgYWNjb3VudC1jcmVhdG9yIGNhbi1yZWFkLWFwcGxpY2F0aW9ucyIsInNjbyI6InJlYWQgd3JpdGUgZGVsZXRlIGVtYWlsIHVzZXJpbmZvIGZlZWQgbWFuYWdlX3ZpZGVvcyBtYW5hZ2VfY29tbWVudHMgbWFuYWdlX3BsYXlsaXN0cyBtYW5hZ2VfdGlsZXMgbWFuYWdlX3N1YnNjcmlwdGlvbnMgbWFuYWdlX2ZyaWVuZHMgbWFuYWdlX2Zhdm9yaXRlcyBtYW5hZ2VfbGlrZXMgbWFuYWdlX2dyb3VwcyBtYW5hZ2VfcmVjb3JkcyBtYW5hZ2Vfc3VidGl0bGVzIG1hbmFnZV9mZWF0dXJlcyBtYW5hZ2VfaGlzdG9yeSBpZnR0dCByZWFkX2luc2lnaHRzIG1hbmFnZV9jbGFpbV9ydWxlcyBkZWxlZ2F0ZV9hY2NvdW50X21hbmFnZW1lbnQgbWFuYWdlX2FuYWx5dGljcyBtYW5hZ2VfcGxheWVyIG1hbmFnZV9wbGF5ZXJzIG1hbmFnZV91c2VyX3NldHRpbmdzIG1hbmFnZV9jb2xsZWN0aW9ucyBtYW5hZ2VfYXBwX2Nvbm5lY3Rpb25zIG1hbmFnZV9hcHBsaWNhdGlvbnMgbWFuYWdlX2RvbWFpbnMgbWFuYWdlX3BvZGNhc3RzIiwibHRvIjoiZVdGV1JTSkdXRVZjVGg0eEYyRWpWblFlTHdrdUhTVjVPMGdrWGciLCJhaW4iOjEsImFkZyI6MSwiaWF0IjoxNzQ2MjU3NzI1LCJleHAiOjE3NDYyOTM1NjgsImRtdiI6IjEiLCJhdHAiOiJicm93c2VyIiwiYWRhIjoid3d3LmRhaWx5bW90aW9uLmNvbSIsInZpZCI6IjY0NjMzRDAzMDY1RjQxODZBRDBCMDI3Q0Y3OTVFRjBGIiwiZnRzIjo5MTE0MSwiY2FkIjoyLCJjeHAiOjIsImNhdSI6Miwia2lkIjoiQUY4NDlERDczQTU4NjNDRDdEOTdEMEJBQjA3MjI0M0IifQ.bMzShOLIb6datC92qGPTRVCW9eINTYDFwLtqed2P1d4',
'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'x-dm-visit-id': '1745971699160',
'x-dm-visitor-id': '64633D03065F4186AD0B027CF795EF0F',
}
Gproxies = None
def get_proxies(g):
url = "https://www.kookeey.com/pickdynamicips"
params = {
"auth": "pwd",
"format": "1",
"n": "1",
"p": "http",
"gate": "sea",
"g": g,
"r": "0",
"type": "json",
"sign": "10099426b05c7119e9c4dbd6a7a0aa4e",
"accessid": "2207189",
"dl": ","
}
try:
response = requests.get(url, params=params)
except RequestException:
return get_proxies(g)
try:
proxy_data = response.json()['data'][0]
except Exception:
print(g)
print("数据返回解析错误!"+ str(response.text))
time.sleep(5)
return get_proxies(g)
proxies_url = f"http://{proxy_data['username']}:{proxy_data['password']}@{proxy_data['ip']}:{proxy_data['port']}"
proxies = {
"http": proxies_url,
"https": proxies_url,
}
return proxies
def post_with_retry(url, json_payload=None, data=None, headers=None, proxies=None,
retries=5, timeout=10, backoff_factor=2, verbose=True):
token_refreshed = False
for attempt in range(1, retries + 1):
try:
proxy_str = db.get_proxy(Gproxies)
proxies = {"http": proxy_str, "https": proxy_str}
resp = requests.post(
url,
json=json_payload,
data=data,
headers=headers,
proxies=proxies,
timeout=timeout
)
if resp.status_code == 401 and not token_refreshed:
if verbose:
print("[post_with_retry] 收到 401刷新 token 后重试")
gettoken()
token_refreshed = True
continue
resp.raise_for_status()
return resp
except RequestException as e:
if verbose:
print(f"[{attempt}/{retries}] 请求失败: {e}")
# 如果还没刷新过 token就刷新一次
if not token_refreshed:
if verbose:
print("[post_with_retry] 刷新 token 后再试")
gettoken()
token_refreshed = True
continue
if attempt == retries:
if verbose:
print(f"[post_with_retry] 最终失败:{url}")
return None
sleep_time = backoff_factor * (2 ** (attempt - 1))
if verbose:
print(f"[post_with_retry] 等待 {sleep_time}s 后重试…")
time.sleep(sleep_time)
def gettoken():
headers = {
'host': 'graphql.api.dailymotion.com',
'sec-ch-ua-platform': '"Windows"',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0',
'sec-ch-ua': '"Chromium";v="136", "Microsoft Edge";v="136", "Not.A/Brand";v="99"',
'content-type': 'application/x-www-form-urlencoded',
'sec-ch-ua-mobile': '?0',
'accept': '*/*',
'origin': 'https://www.dailymotion.com',
'sec-fetch-site': 'same-site',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'referer': 'https://www.dailymotion.com/',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'priority': 'u=1, i',
}
u = uuid.uuid4()
uuid_with_dash = str(u)
uuid_no_dash = u.hex
data = {
'client_id': 'f1a362d288c1b98099c7',
'client_secret': 'eea605b96e01c796ff369935357eca920c5da4c5',
'grant_type': 'client_credentials',
'traffic_segment': '244677',
'visitor_id': uuid_with_dash,
}
try:
proxy_str = db.get_proxy(Gproxies)
url = 'https://graphql.api.dailymotion.com/oauth/token'
response = requests.post(url, headers=headers, data=data, proxies={"http": proxy_str, "https": proxy_str})
token = response.json()['access_token']
headers1['authorization'] = "Bearer " + token
headers1['x-dm-visit-id'] = str(int(time.time() * 1000))
headers1['x-dm-visitor-id'] = uuid_no_dash
except Exception as e:
print(str(e))
pass
def get_searchInfo(keyword):
video_list = []
for j in range(1, 3):
# 别展开 = = !
data = {
"operationName": "SEARCH_QUERY",
"variables": {
"query": keyword,
"shouldIncludeTopResults": True,
"shouldIncludeChannels": False,
"shouldIncludePlaylists": False,
"shouldIncludeHashtags": False,
"shouldIncludeVideos": False,
"shouldIncludeLives": False,
"page": j,
"limit": 100,
"recaptchaToken": None
},
"query": """
fragment VIDEO_BASE_FRAGMENT on Video {
id
xid
title
createdAt
duration
aspectRatio
thumbnail(height: PORTRAIT_240) {
id
url
__typename
}
creator {
id
xid
name
displayName
accountType
avatar(height: SQUARE_60) {
id
url
__typename
}
__typename
}
__typename
}
fragment CHANNEL_BASE_FRAG on Channel {
id
xid
name
displayName
accountType
isFollowed
avatar(height: SQUARE_120) {
id
url
__typename
}
followerEngagement {
id
followDate
__typename
}
metrics {
id
engagement {
id
followers {
edges {
node {
id
total
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
fragment PLAYLIST_BASE_FRAG on Collection {
id
xid
name
description
thumbnail(height: PORTRAIT_240) {
id
url
__typename
}
creator {
id
xid
name
displayName
accountType
avatar(height: SQUARE_60) {
id
url
__typename
}
__typename
}
metrics {
id
engagement {
id
videos(filter: {visibility: {eq: PUBLIC}}) {
edges {
node {
id
total
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
fragment HASHTAG_BASE_FRAG on Hashtag {
id
xid
name
metrics {
id
engagement {
id
videos {
edges {
node {
id
total
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
fragment LIVE_BASE_FRAGMENT on Live {
id
xid
title
audienceCount
aspectRatio
isOnAir
thumbnail(height: PORTRAIT_240) {
id
url
__typename
}
creator {
id
xid
name
displayName
accountType
avatar(height: SQUARE_60) {
id
url
__typename
}
__typename
}
__typename
}
query SEARCH_QUERY($query: String!, $shouldIncludeTopResults: Boolean!, $shouldIncludeVideos: Boolean!, $shouldIncludeChannels: Boolean!, $shouldIncludePlaylists: Boolean!, $shouldIncludeHashtags: Boolean!, $shouldIncludeLives: Boolean!, $page: Int, $limit: Int, $sortByVideos: SearchVideoSort, $durationMinVideos: Int, $durationMaxVideos: Int, $createdAfterVideos: DateTime, $recaptchaToken: String) {
search(token: $recaptchaToken) {
id
stories(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeTopResults) {
metadata {
id
algorithm {
uuid
__typename
}
__typename
}
pageInfo {
hasNextPage
nextPage
__typename
}
edges {
node {
...VIDEO_BASE_FRAGMENT
...CHANNEL_BASE_FRAG
...PLAYLIST_BASE_FRAG
...HASHTAG_BASE_FRAG
...LIVE_BASE_FRAGMENT
__typename
}
__typename
}
__typename
}
videos(
query: $query
first: $limit
page: $page
sort: $sortByVideos
durationMin: $durationMinVideos
durationMax: $durationMaxVideos
createdAfter: $createdAfterVideos
) @include(if: $shouldIncludeVideos) {
metadata {
id
algorithm {
uuid
__typename
}
__typename
}
pageInfo {
hasNextPage
nextPage
__typename
}
edges {
node {
id
...VIDEO_BASE_FRAGMENT
__typename
}
__typename
}
__typename
}
lives(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeLives) {
metadata {
id
algorithm {
uuid
__typename
}
__typename
}
pageInfo {
hasNextPage
nextPage
__typename
}
edges {
node {
id
...LIVE_BASE_FRAGMENT
__typename
}
__typename
}
__typename
}
channels(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeChannels) {
metadata {
id
algorithm {
uuid
__typename
}
__typename
}
pageInfo {
hasNextPage
nextPage
__typename
}
edges {
node {
id
...CHANNEL_BASE_FRAG
__typename
}
__typename
}
__typename
}
playlists: collections(query: $query, first: $limit, page: $page) @include(if: $shouldIncludePlaylists) {
metadata {
id
algorithm {
uuid
__typename
}
__typename
}
pageInfo {
hasNextPage
nextPage
__typename
}
edges {
node {
id
...PLAYLIST_BASE_FRAG
__typename
}
__typename
}
__typename
}
hashtags(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeHashtags) {
metadata {
id
algorithm {
uuid
__typename
}
__typename
}
pageInfo {
hasNextPage
nextPage
__typename
}
edges {
node {
id
...HASHTAG_BASE_FRAG
__typename
}
__typename
}
__typename
}
__typename
}
}
"""
}
gettoken()
response = post_with_retry(
"https://graphql.api.dailymotion.com/",
json_payload=data,
headers=headers1,
proxies=None
)
jsondata = response.json()
try:
resinfo = jsondata['data']['search']['stories']['edges']
print('resinfo :', len(resinfo))
except Exception:
resinfo = []
print(response.text)
print("返回字段解析错误!")
video_tasks = []
for index, iteminfo in enumerate(resinfo):
calculated_index = index + 1 + (j - 1) * 100
node = iteminfo['node']
if node['__typename'] != "Video":
continue
creator = node['creator']
video_tasks.append({
"index": calculated_index,
"xid": node.get('xid'),
"node": node,
"creator": creator,
})
def safe_fetch(task, max_try=2):
attempt = 0
while attempt < max_try:
try:
return fetch_video_detail(task)
except Exception as e:
attempt += 1
print(f"[线程异常] {task['xid']} 获取失败: {str(e)}")
node = task["node"]
creator = task["creator"]
avatar = creator.get("avatar", {})
return {
"index": task["index"],
"v_id": node.get("id"),
"v_xid": task["xid"],
"link": "https://www.dailymotion.com/video/" + task["xid"],
"title": node.get("title"),
"createtime": node.get("createdAt"),
"duration": node.get("duration"),
"pic": node.get("thumbnail", {}).get("url"),
"view": 0,
"fans": 0,
"videos": 0,
"u_id": creator.get('id'),
"u_xid": creator.get('xid'),
"u_name": creator.get('name'),
"u_pic": avatar.get('url'),
"_region": Gproxies
}
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
results = list(executor.map(safe_fetch, video_tasks))
for result in results:
if result:
video_list.append(result)
return video_list
def fetch_video_detail(task):
xid = task["xid"]
v_info = get_videoInfo(xid)
node = task["node"]
creator = task["creator"]
avatar = creator.get("avatar", {})
return {
"index": task["index"],
"v_id": node.get("id"),
"v_xid": xid,
"link": "https://www.dailymotion.com/video/" + xid,
"title": node.get("title"),
"createtime": node.get("createdAt"),
"duration": node.get("duration"),
"pic": node.get("thumbnail", {}).get("url"),
"view": v_info['view'],
"fans": v_info['fans'],
"videos": v_info['videos'],
"u_id": creator.get('id'),
"u_xid": creator.get('xid'),
"u_name": creator.get('name'),
"u_pic": avatar.get('url'),
"_region": proxies_address
}
def get_videoInfo(x_id, r=3):
payload = {
"operationName": "WATCHING_VIDEO",
"variables": {
"xid": x_id,
"isSEO": False
},
"query": "fragment VIDEO_FRAGMENT on Video {\n id\n xid\n isPublished\n duration\n title\n description\n thumbnailx60: thumbnailURL(size: \"x60\")\n thumbnailx120: thumbnailURL(size: \"x120\")\n thumbnailx240: thumbnailURL(size: \"x240\")\n thumbnailx360: thumbnailURL(size: \"x360\")\n thumbnailx480: thumbnailURL(size: \"x480\")\n thumbnailx720: thumbnailURL(size: \"x720\")\n thumbnailx1080: thumbnailURL(size: \"x1080\")\n aspectRatio\n category\n categories(filter: {category: {eq: CONTENT_CATEGORY}}) {\n edges {\n node { id name slug __typename }\n __typename\n }\n __typename\n }\n iab_categories: categories(\n filter: {category: {eq: IAB_CATEGORY}, percentage: {gte: 70}}\n ) {\n edges {\n node { id slug __typename }\n __typename\n }\n __typename\n }\n bestAvailableQuality\n createdAt\n viewerEngagement {\n id\n liked\n favorited\n __typename\n }\n isPrivate\n isWatched\n isCreatedForKids\n isExplicit\n canDisplayAds\n videoWidth: width\n videoHeight: height\n status\n hashtags {\n edges {\n node { id name __typename }\n __typename\n }\n __typename\n }\n stats {\n id\n views { id total __typename }\n __typename\n }\n channel {\n __typename\n id\n xid\n name\n displayName\n isArtist\n logoURLx25: logoURL(size: \"x25\")\n logoURL(size: \"x60\")\n isFollowed\n accountType\n coverURLx375: coverURL(size: \"x375\")\n stats {\n id\n views { id total __typename }\n followers { id total __typename }\n videos { id total __typename }\n __typename\n }\n country { id codeAlpha2 __typename }\n organization @skip(if: $isSEO) {\n id\n xid\n owner { id xid __typename }\n __typename\n }\n }\n language { id codeAlpha2 __typename }\n tags {\n edges {\n node { id label __typename }\n __typename\n }\n __typename\n }\n moderation { id reviewedAt __typename }\n topics(whitelistedOnly: true, first: 3, page: 1) {\n edges {\n node {\n id\n xid\n name\n names {\n edges {\n node {\n id\n name\n language { id codeAlpha2 __typename }\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n geoblockedCountries {\n id\n allowed\n denied\n __typename\n }\n transcript {\n edges {\n node { id timecode text __typename }\n __typename\n }\n __typename\n }\n __typename\n}\n\nfragment LIVE_FRAGMENT on Live {\n id\n xid\n startAt\n endAt\n isPublished\n title\n description\n thumbnailx60: thumbnailURL(size: \"x60\")\n thumbnailx120: thumbnailURL(size: \"x120\")\n thumbnailx240: thumbnailURL(size: \"x240\")\n thumbnailx360: thumbnailURL(size: \"x360\")\n thumbnailx480: thumbnailURL(size: \"x480\")\n thumbnailx720: thumbnailURL(size: \"x720\")\n thumbnailx1080: thumbnailURL(size: \"x1080\")\n aspectRatio\n category\n createdAt\n viewerEngagement { id liked favorited __typename }\n isPrivate\n isExplicit\n isCreatedForKids\n bestAvailableQuality\n canDisplayAds\n videoWidth: width\n videoHeight: height\n stats { id views { id total __typename } __typename }\n channel {\n __typename\n id\n xid\n name\n displayName\n isArtist\n logoURLx25: logoURL(size: \"x25\")\n logoURL(size: \"x60\")\n isFollowed\n accountType\n coverURLx375: coverURL(size: \"x375\")\n stats { id views { id total __typename } followers { id total __typename } videos { id total __typename } __typename }\n country { id codeAlpha2 __typename }\n organization @skip(if: $isSEO) { id xid owner { id xid __typename } __typename }\n }\n language { id codeAlpha2 __typename }\n tags { edges { node { id label __typename } __typename } __typename }\n moderation { id reviewedAt __typename }\n topics(whitelistedOnly: true, first: 3, page: 1) {\n edges { node { id xid name names { edges { node { id name language { id codeAlpha2 __typename } __typename } __typename } __typename } __typename } __typename }\n __typename\n }\n geoblockedCountries { id allowed denied __typename }\n __typename\n}\n\nquery WATCHING_VIDEO($xid: String!, $isSEO: Boolean!) {\n video: media(xid: $xid) {\n __typename\n ... on Video { id ...VIDEO_FRAGMENT __typename }\n ... on Live { id ...LIVE_FRAGMENT __typename }\n }\n}"
}
url = 'https://graphql.api.dailymotion.com/'
response = post_with_retry(
url,
json_payload=payload,
headers=headers1,
proxies=None,
)
jsondata = response.json()
try:
v_info = jsondata['data']['video']['channel']['stats']
except Exception:
if r > 0:
return get_videoInfo(x_id=x_id, r=r - 1)
else:
return {
"view": '-',
"fans": '-',
"videos": '-',
}
return {
"view": v_info['views']['total'],
"fans": v_info['followers']['total'],
"videos": v_info['videos']['total'],
}
def integrate_data():
while True:
keywords, flag = db.item_keyword()
if len(keywords) < 1:
time.sleep(30)
else:
for index, (payload, kitem) in enumerate(keywords):
try:
global Gproxies
Gproxies = proxies_address[kitem['rn']]
v_list = get_searchInfo(kitem['keyword'])
if not v_list:
for i in range(3):
time.sleep(i * 5)
v_list = get_searchInfo(kitem["keyword"])
if v_list:
break
time.sleep(2)
for item in v_list:
record = {
"keyword": kitem.get("keyword"),
"v_name" : kitem.get("v_name"),
"v_id": item.get("v_id"),
"v_xid": item.get("v_xid"),
"link": item.get("link"),
"title": item.get("title"),
"duration": format_duration(item.get("duration")),
"fans": clean_dash_to_zero(item.get("fans", 0)),
"videos": clean_dash_to_zero(item.get("videos", 0)),
"watch_number": clean_dash_to_zero(item.get("view", 0)),
"create_time": format_create_time(item.get("createtime")),
"cover_pic": item.get("pic"),
"index": item.get("index", 0),
"u_id": item.get("u_id"),
"u_xid": item.get("u_xid"),
"u_name": item.get("u_name"),
"u_pic": item.get("u_pic"),
"rn": kitem.get("rn"),
"batch": kitem['batch'],
"machine_id": MACHINE_ID,
"level": kitem['level'],
}
db.upsert_video(record)
db.flush()
except Exception as e:
print(f"[异常] {str(e.__class__.__name__)}: {str(e)}")
print(f"[异常] 处理关键词 {kitem['keyword']} 时发生错误,正在回滚...")
time.sleep(5)
remaining_payloads = [p for p, _ in keywords[index:]]
if flag == 2:
db.rollback(remaining_payloads)
elif flag == 1:
db.rollback_records(remaining_payloads)
time.sleep(5)
break
def parse_args() -> argparse.Namespace:
global MACHINE_ID, MAX_WORKERS
parser = argparse.ArgumentParser(
description="Configure worker settings."
)
parser.add_argument(
"-m", "--machine-id",
type=int,
help=f"Machine identifier (default: {MACHINE_ID})"
)
parser.add_argument(
"-w", "--max-workers",
type=int,
help=f"Maximum concurrent workers (default: {MAX_WORKERS})"
)
args = parser.parse_args()
if args.machine_id is not None:
MACHINE_ID = args.machine_id
if args.max_workers is not None:
if args.max_workers <= 0:
parser.error("--max-workers 不能是 0")
MAX_WORKERS = args.max_workers
if MACHINE_ID is None:
raise ValueError("请指定机器编号")
return args
if __name__ == '__main__':
parse_args()
start_time = datetime.datetime.now()
print(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}")
integrate_data()
end_time = datetime.datetime.now()
duration = end_time - start_time

133
multi_proxy_refill.py Normal file
View File

@ -0,0 +1,133 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import sys
import argparse
import redis
import requests
from requests import RequestException
from urllib.parse import quote
# —— 固定 Redis 连接配置 —— #
_REDIS_CONF = {
"host": "127.0.0.1",
"port": 6379,
"password": "qwert@$123!&",
"decode_responses": True,
}
# 阈值与批次参数
QUEUE_PREFIX = "proxy_queue"
LOW_WATERMARK = 200
REFILL_BATCH = 1000
SLEEP_INTERVAL = 10
# 区域映射
PROXIES_ADDRESS = {
"印度尼西亚": "ID",
"马来": "MY",
"加拿大": "CA",
"台湾": "TW", #"CN_city_TW",
"泰国": "TH",
"美国": "US",
"西班牙": "ES",
"韩国": "KR",
"香港": "HK", #"CN_city_HK",
"越南": "VN",
}
# 第三方 API 参数
PROXY_API_URL = "https://www.kookeey.com/pickdynamicips"
ACCESS_ID = "2207189"
SIGN = "10099426b05c7119e9c4dbd6a7a0aa4e"
def fetch_proxies(region_name: str, n: int) -> list[str]:
"""
从第三方一次性请求 n 条代理返回格式化列表
"""
params = {
"auth": "pwd",
"format": "1",
"n": str(n),
"p": "http",
"gate": "sea",
"g": region_name,
"r": "0",
"type": "json",
"sign": SIGN,
"accessid": ACCESS_ID,
"dl": ",",
}
try:
resp = requests.get(PROXY_API_URL, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
except (RequestException, ValueError):
time.sleep(1)
return fetch_proxies(region_name, n)
arr = data.get("data") or []
if not arr:
time.sleep(1)
return fetch_proxies(region_name, n)
result = []
for item in arr:
user = quote(item["username"], safe="")
pwd = quote(item["password"], safe="")
ip = item["ip"]
port = item["port"]
result.append(f"http://{user}:{pwd}@{ip}:{port}")
return result
def refill_queue(r: redis.Redis, region_name: str, region_code: str,
low: int, batch: int):
"""
如果 region 对应的队列长度 < low就一次性补 batch
"""
key = f"{QUEUE_PREFIX}:{region_code}"
length = r.llen(key)
if length >= low:
return
to_fetch = batch - length
print(f"[{time.strftime('%H:%M:%S')}] {key} 长度 {length} < {low},一次性拉取 {to_fetch} 条…")
proxies = fetch_proxies(region_name, to_fetch)
if proxies:
r.rpush(key, *proxies)
print(f"[{time.strftime('%H:%M:%S')}] 已入队 {len(proxies)} 条 → 新长度 {r.llen(key)}")
else:
print(f"[{time.strftime('%H:%M:%S')}] 拉取失败,无数据入队")
def main():
p = argparse.ArgumentParser(description="多地区代理队列自动补给")
p.add_argument("--low", "-l", type=int, default=LOW_WATERMARK,
help="低水位阈值,队列长度低于此值时触发补给")
p.add_argument("--batch", "-b", type=int, default=REFILL_BATCH,
help="单次补给目标条数")
p.add_argument("--sleep", "-s", type=int, default=SLEEP_INTERVAL,
help="检测间隔秒数")
args = p.parse_args()
r = redis.Redis(**_REDIS_CONF)
print(f"[*] 启动补给,监控前缀 `{QUEUE_PREFIX}:<code>` 列表,低于 {args.low} 条即补至 {args.batch}")
while True:
try:
for region_name, region_code in PROXIES_ADDRESS.items():
refill_queue(r, region_name, region_code, args.low, args.batch)
time.sleep(args.sleep)
except Exception as e:
msg = str(e).encode("utf-8", "ignore").decode()
print(f"[ERROR {time.strftime('%H:%M:%S')}] {msg}", file=sys.stderr)
time.sleep(args.sleep)
if __name__ == "__main__":
main()

89
mysql_to_xlsx.py Normal file
View File

@ -0,0 +1,89 @@
import pymysql
import pandas as pd
from datetime import datetime
# 数据库连接配置
db_config = {
"host": "192.144.230.75",
"port": 3306,
"user": "db_vidcon",
"password": "rexdK4fhCCiRE4BZ",
"database": "db_vidcon",
"charset": "utf8mb4",
}
def get_rn_list():
"""获取所有地区列表"""
sql = "SELECT DISTINCT rn FROM sh_dm_video_op_v2;"
conn = pymysql.connect(**db_config)
df = pd.read_sql(sql, conn)
conn.close()
return df['rn'].tolist()
def get_data_for_rn(rn: str) -> pd.DataFrame:
"""针对指定 rn 拉取数据"""
# 注意:这里把 SQL 中的 rn 和 level 参数化
sql = f"""
SELECT
op.id AS ID,
v.v_name AS 片名,
v.link AS 视频连接,
v.is_piracy AS 是否盗版,
op.`level` AS 优先级,
op.rn AS 地区,
NULL AS 投诉日期,
NULL AS 下线日期,
op.keyword AS 关键词,
v.title AS 标题,
v.duration AS 时长,
v.watch_number AS 观看数量,
v.public_time AS 上传时间,
v.u_pic AS 头像,
CASE
WHEN dup.cnt > 1 THEN 1
ELSE 2
END AS 是否重复,
op.sort AS 排序,
op.batch AS 批次,
op.machine AS 机器号,
v.u_id AS 用户id,
v.u_xid AS u_xid,
v.u_name AS 用户名称
FROM sh_dm_video_op_v2 AS op
LEFT JOIN (
SELECT
t.v_xid,
COUNT(*) AS cnt
FROM sh_dm_video_op_v2 AS t
WHERE t.batch IN (1747324254, 1747323990)
GROUP BY t.v_xid
) AS dup
ON op.v_xid = dup.v_xid
LEFT JOIN sh_dm_video_v2 AS v
ON op.v_xid = v.v_xid
WHERE op.rn = %s
AND op.batch IN (1747324254, 1747323990)
ORDER BY op.id;
"""
conn = pymysql.connect(**db_config)
df = pd.read_sql(sql, conn, params=(rn))
conn.close()
return df
def export_all():
"""循环所有地区,导出 Excel"""
rn_list = get_rn_list()
for rn in rn_list:
df = get_data_for_rn(rn)
if df.empty:
continue
timestamp = datetime.now().strftime("%Y%m%d")
safe_rn = rn.replace(" ", "_") # 如果地区名里有空格或特殊字符
filename = f"{timestamp}_T0T1_{safe_rn}.xlsx"
df.to_excel(filename, index=False)
print(f"已导出:{filename}")
if __name__ == "__main__":
export_all()

20
requirements.txt Normal file
View File

@ -0,0 +1,20 @@
async-timeout==4.0.3
certifi==2025.4.26
charset-normalizer==3.4.2
et-xmlfile==1.1.0
idna==3.10
importlib-metadata==6.7.0
lxml==5.4.0
numpy==1.21.6
openpyxl==3.1.3
pandas==1.3.5
pkg_resources==0.0.0
PyMySQL==1.1.1
python-dateutil==2.9.0.post0
pytz==2025.2
redis==5.0.8
requests==2.31.0
six==1.17.0
typing_extensions==4.7.1
urllib3==2.0.7
zipp==3.15.0