From 14023c3b483dbfe3bfacc3032d363fc4c3f083d6 Mon Sep 17 00:00:00 2001 From: Franklin-F Date: Fri, 16 May 2025 22:16:26 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=88=9D=E5=A7=8B=E5=8C=96=20DailyMoti?= =?UTF-8?q?on=20=E5=88=86=E5=B8=83=E5=BC=8F=E7=88=AC=E8=99=AB=E9=A1=B9?= =?UTF-8?q?=E7=9B=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/.gitignore | 8 + .idea/DailyMotion.iml | 14 + .idea/inspectionProfiles/Project_Default.xml | 67 ++ .../inspectionProfiles/profiles_settings.xml | 6 + .idea/misc.xml | 7 + .idea/modules.xml | 8 + .idea/vcs.xml | 6 + DB.py | 225 +++++ README.md | 37 + consume_video_records.py | 43 + dump_keyword_title.py | 84 ++ init_python.sh | 30 + install_ql.sh | 62 ++ main.py | 796 ++++++++++++++++++ multi_proxy_refill.py | 133 +++ mysql_to_xlsx.py | 89 ++ requirements.txt | 20 + 17 files changed, 1635 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 .idea/DailyMotion.iml create mode 100644 .idea/inspectionProfiles/Project_Default.xml create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 DB.py create mode 100644 README.md create mode 100644 consume_video_records.py create mode 100644 dump_keyword_title.py create mode 100644 init_python.sh create mode 100644 install_ql.sh create mode 100644 main.py create mode 100644 multi_proxy_refill.py create mode 100644 mysql_to_xlsx.py create mode 100644 requirements.txt diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/DailyMotion.iml b/.idea/DailyMotion.iml new file mode 100644 index 0000000..b30b961 --- /dev/null +++ b/.idea/DailyMotion.iml @@ -0,0 +1,14 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..6748c35 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,67 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..c4616ff --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..3b9c92d --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/DB.py b/DB.py new file mode 100644 index 0000000..2c9aecd --- /dev/null +++ b/DB.py @@ -0,0 +1,225 @@ +import json +import redis +import pymysql +import time + + +class DBVidcon: + _MYSQL_CONF = { + "host": "192.144.230.75", + "port": 3306, + "user": "db_vidcon", + "password": "rexdK4fhCCiRE4BZ", + "database": "db_vidcon", + "charset": "utf8mb4", + "cursorclass": pymysql.cursors.DictCursor, + } + _REDIS_CONF = { + "host": "192.144.230.75", + "port": 6379, + "password": "qwert@$123!&", + "decode_responses": True, + } + + def __init__(self): + self.list_key = "video_kw_queue" + self.record_list_key = "record_kw_queue" + self.urgent_list_key = "video_urgent_queue" + self.conn = pymysql.connect(**self._MYSQL_CONF) + self.cursor = self.conn.cursor() + self.redis = redis.Redis(**self._REDIS_CONF) + + def _connect_redis(self): + """初始化或重建 Redis 客户端""" + self.redis = redis.Redis(**self._REDIS_CONF) + + def reconnect_redis(self): + """当捕获到 ConnectionError 时,重连 Redis""" + try: + self._connect_redis() + except Exception as e: + print("[Redis reconnect error]", e) + time.sleep(2) + + def push_record(self, data: dict): + raw = json.dumps(data, ensure_ascii=False) + try: + self.redis.lpush(self.record_list_key, raw) + except redis.exceptions.ConnectionError: + self.reconnect_redis() + self.redis.lpush(self.record_list_key, raw) + + def fetch_records(self, count: int = 100): + try: + raws = self.redis.lpop(self.record_list_key, count) + except TypeError: + raws = [] + for _ in range(count): + item = self.redis.rpop(self.record_list_key) + if item is None: + break + raws.append(item) + except redis.exceptions.ConnectionError: + self.reconnect_redis() + return self.fetch_records(count) + + if not raws: + return [] + if isinstance(raws, str): + raws = [raws] + + out = [] + for raw in raws: + try: + data = json.loads(raw) + out.append((raw, data)) + except json.JSONDecodeError: + continue + return out + + def fetch_from_redis(self, count: int = 100, list_key: str = None): + key = list_key + try: + raws = self.redis.lpop(key, count) + except TypeError: + raws = [] + for _ in range(count): + item = self.redis.rpop(key) + if item is None: + break + raws.append(item) + except redis.exceptions.ConnectionError as e: + print("[Redis pop error]", e) + self.reconnect_redis() + return [] + if not raws: + return [] + if isinstance(raws, str): + raws = [raws] + out = [] + for raw in raws: + try: + out.append((raw, json.loads(raw))) + except json.JSONDecodeError: + continue + return out + + def rollback_records(self, raws): + if isinstance(raws, str): + raws = [raws] + self.redis.lpush(self.record_list_key, *raws) + + def rollback_urgent(self, raws): + """写库失败或加急任务处理失败时,把原始 JSON 退回 urgent 列表""" + if isinstance(raws, str): + raws = [raws] + try: + self.redis.lpush(self.urgent_list_key, *raws) + except redis.exceptions.ConnectionError as e: + print("[Redis urgent rollback error]", e) + self.reconnect_redis() + self.redis.lpush(self.urgent_list_key, *raws) + + def item_keyword(self, count: int = 100): + try: + urgent_items = self.fetch_from_redis(count, list_key=self.urgent_list_key) + except Exception as e: + print("[Redis urgent pop error]", e) + self.reconnect_redis() + urgent_items = [] + if urgent_items: + return urgent_items, 1 + try: + items = self.fetch_from_redis(count, list_key=self.list_key) + except Exception as e: + print("[Redis normal pop error]", e) + self.reconnect_redis() + return [], 0 + return items, 2 + + def rollback(self, payloads): + if not payloads: + return + if isinstance(payloads, str): + payloads = [payloads] + self.redis.rpush(self.list_key, *payloads) + print(f"[回滚] 已退回 {len(payloads)} 条") + + def upsert_video(self, data: dict): + """ + 1) 插入到 sh_dm_video_op_v2 + 2) DELETE sh_dm_video_v2 WHERE rn = … AND v_xid = … + 3) INSERT INTO sh_dm_video_v2 (…) VALUES (…) + """ + # 保底字段 + data.setdefault("a_id", 0) + data.setdefault("history_status", "") + data.setdefault("is_repeat", 3) + data["sort"] = data.get("index", 0) + + try: + sql_op = """ + INSERT INTO sh_dm_video_op_v2 ( + v_id, v_xid, a_id, level, name_title, + keyword, rn, history_status, is_repeat, + sort, createtime, updatetime, batch, machine + ) VALUES ( + %(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, + %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, + %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s + ) + """ + self.cursor.execute(sql_op, data) + sql_del = """ + DELETE FROM sh_dm_video_v2 + WHERE rn = %(rn)s + AND v_xid = %(v_xid)s + """ + self.cursor.execute(sql_del, data) + sql_ins = """ + INSERT INTO sh_dm_video_v2 ( + v_id, v_xid, rn, v_name, title, link, + is_piracy, edition, duration, + watch_number, follow_number, video_number, + public_time, cover_pic, sort, + u_xid, u_id, u_pic, u_name, + status, createtime, updatetime + ) VALUES ( + %(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, + 3, '', %(duration)s, + %(watch_number)s, %(fans)s, %(videos)s, + %(create_time)s, %(cover_pic)s, %(sort)s, + %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, + 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP() + ) + """ + self.cursor.execute(sql_ins, data) + + except Exception as e: + # 打印错误并回滚 + print("[数据库写入异常]", str(e)) + print("[出错数据]", data) + raise + + def flush(self): + """批量执行完后手动提交。""" + self.conn.commit() + + def close(self): + self.cursor.close() + self.conn.close() + + def get_proxy(self, region_code: str) -> str: + """ + 从 Redis 队列 proxy_queue: 弹出一个代理并返回。 + 如果队列为空,返回空字符串。 + """ + proxy = "" + while True: + key = f"proxy_queue:{region_code}" + proxy = self.redis.lpop(key) + if proxy is None: + time.sleep(10) + else: + break + return proxy \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..2f03cd5 --- /dev/null +++ b/README.md @@ -0,0 +1,37 @@ +# DailyMotion 分布式爬虫 + +### 文件说明 + +```text +consume_video_records.py 集中式存储 -弃用 +DB.py 数据库相关 +dump_keyword_title.py 任务下发脚本 -l [level] [-u 1:加急] +init_python.sh 初始化python环境 +install_ql.sh 安装青龙面板 +main.py 爬取脚本 -m [机器号] -w [并发数量] +multi_proxy_refill.py 自动代理池维护 +mysql_to_xlsx.py 导出脚本 限制内陆运行 +README.md +requirements.txt 统一包管理 +``` + +青龙面板 + +| 地区 | IP | 用户 | 密码 | 青龙面板地址 | +| ------ | --------------- | ---- | ---------- | --------------------------- | +| *印尼 | 165.154.150.131 | root | 74jX4EScfC | http://165.154.150.131:5700 | +| 新加坡 | 152.32.220.250 | root | nWF9Cn}58A | http://152.32.220.250:5700 | +| 台北 | 123.58.197.91 | root | ZL%&f72)4i | http://123.58.197.91:5700 | +| 曼谷 | 165.154.181.211 | root | hi7Xu!MH6] | http://165.154.181.211:5700 | +| 胡志明 | 156.232.99.139 | root | Jg74M5cT6u | http://156.232.99.139:5700 | +| 首尔 | 152.32.243.118 | root | P7Ymwns&^8 | http://152.32.243.118:5700 | + +#### 青龙面板统一密码: + +用户: `admin` + +密码: `P@ssWord` + + + +- 印尼机器数据库`127.0.0.1` 端口+1 diff --git a/consume_video_records.py b/consume_video_records.py new file mode 100644 index 0000000..e5962c3 --- /dev/null +++ b/consume_video_records.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import time +import traceback +from DB import DBVidcon + +def main(): + db = DBVidcon() + + try: + while True: + tasks = db.fetch_records(100) + if not tasks: + time.sleep(2) + continue + + raws_to_rollback = [] + for raw, data in tasks: + try: + db.upsert_video(data) + except Exception: + traceback.print_exc() + raws_to_rollback.append(raw) + + try: + db.flush() + except Exception: + traceback.print_exc() + db.rollback_records([r for r, _ in tasks]) + else: + if raws_to_rollback: + db.rollback_records(raws_to_rollback) + + except KeyboardInterrupt: + print("Interrupted, exiting.") + except Exception: + traceback.print_exc() + finally: + db.close() + +if __name__ == "__main__": + main() diff --git a/dump_keyword_title.py b/dump_keyword_title.py new file mode 100644 index 0000000..257db2d --- /dev/null +++ b/dump_keyword_title.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +""" +用法示例 +-------- +# 用默认 level=1 +python3 dump_keyword_title.py + +# 指定 level=3 +python3 dump_keyword_title.py -l 3 +""" +import json, time, pymysql, redis +import argparse + +# ======= 配置区 ======= +MYSQL_CONFIG = { + "host": "192.144.230.75", "port": 3306, + "user": "db_vidcon", "password": "rexdK4fhCCiRE4BZ", + "database": "db_vidcon", "charset": "utf8mb4", + "cursorclass": pymysql.cursors.DictCursor +} +REDIS_CONFIG = { + "host": "192.144.230.75", "port": 6379, + "password": "qwert@$123!&", "decode_responses": True +} +LIST_KEY = "video_kw_queue" +BATCH_SIZE = 1000 +SQL = """ +SELECT + k.keyword, + k.rn, + t.title AS v_name, + ANY_VALUE(t.level) AS level +FROM + sh_dm_keyword k +LEFT JOIN + sh_dm_title t ON k.title = t.title +WHERE + k.status = 1 + AND t.status = 1 + AND NOT EXISTS ( + SELECT 1 FROM sh_dm_black_keyword b WHERE b.title = t.title + ) + AND t.level = %s +GROUP BY k.keyword, k.rn +""" + +def parse_args(): + parser = argparse.ArgumentParser( + description="Dump keyword/title rows into Redis list." + ) + parser.add_argument("-l", "--level", type=int, default=99, + help="value for t.level (default: 99)") + parser.add_argument("-u", "--urgent", type=int, default=0, + help="加急标记") + return parser.parse_args() + +def main(): + args = parse_args() + batch_ts = int(time.time()) + conn = pymysql.connect(**MYSQL_CONFIG) + cur = conn.cursor() + cur.execute(SQL, (args.level,)) + r = redis.Redis(**REDIS_CONFIG) + pipe = r.pipeline() + total = 0 + start = time.time() + global LIST_KEY + if args.urgent == 1: + LIST_KEY = "video_urgent_queue" + + for row in cur: + row["batch"] = batch_ts + pipe.lpush(LIST_KEY, json.dumps(row, ensure_ascii=False)) + total += 1 + if total % BATCH_SIZE == 0: + pipe.execute() + + if pipe.command_stack: + pipe.execute() + + print(f"✔ 推送 {total} 行(level={args.level}, batch={batch_ts})→ Redis '{LIST_KEY}',耗时 {time.time()-start:.2f}s") + +if __name__ == "__main__": + main() diff --git a/init_python.sh b/init_python.sh new file mode 100644 index 0000000..006f261 --- /dev/null +++ b/init_python.sh @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +set -euo pipefail + +# 1. 切到项目根目录 +cd /opt/ql/ + +apt-get install -y python3-venv + + +# 2. 创建 virtualenv(如果不存在) +if [ ! -d daily_com ]; then + echo "创建虚拟环境 daily_com..." + python3 -m venv daily_com +fi + +# 3. 激活 venv +# shellcheck disable=SC1091 +source daily_com/bin/activate + +# 4. 升级 pip +echo "升级 pip..." +pip install --upgrade pip + +# 5. 安装依赖 +if [ -f requirements.txt ]; then + echo "安装依赖 requirements.txt..." + pip install -r requirements.txt +else + echo "⚠️ 未找到 requirements.txt,跳过依赖安装" +fi diff --git a/install_ql.sh b/install_ql.sh new file mode 100644 index 0000000..77c54de --- /dev/null +++ b/install_ql.sh @@ -0,0 +1,62 @@ +#!/usr/bin/env bash +# install_ql2183.sh – install QingLong v2.18.3-3 (host mode) +set -euo pipefail +export DEBIAN_FRONTEND=noninteractive + +######################## 可自定义区域 ######################## +QL_VERSION="2.18.3-3" +NODE_MAJOR="20" +QL_DATA_DIR="/opt/ql/data" # 仅数据放 /opt/ql +QL_PORT="5700" +############################################################## + +[[ $EUID -eq 0 ]] || { echo "请用 root 或 sudo 执行"; exit 1; } + +echo "▶ 允许 APT 接受 Suite 变更..." +cat >/etc/apt/apt.conf.d/99releaseinfochange <<'EOF' +Acquire::AllowReleaseInfoChange::Suite "true"; +Acquire::AllowReleaseInfoChange::Version "true"; +EOF + +echo "▶ 刷新索引 & 安装依赖..." +apt-get update -qq +apt-get install -y curl git python3 python3-pip build-essential >/dev/null + +echo "▶ 安装 Node.js $NODE_MAJOR.x..." +curl -fsSL https://deb.nodesource.com/setup_${NODE_MAJOR}.x | bash - >/dev/null +apt-get install -y nodejs >/dev/null + +echo "▶ 安装 pnpm 与 QingLong..." +npm install -g pnpm@8.3.1 node-pre-gyp >/dev/null +npm install -g "@whyour/qinglong@${QL_VERSION}" >/dev/null + +# 🟢 自动解析 QingLong 源码真实路径 +QL_DIR="$(npm root -g)/@whyour/qinglong" +echo "▶ QingLong 源码目录: $QL_DIR" + +echo "▶ 创建数据目录 $QL_DATA_DIR..." +mkdir -p "$QL_DATA_DIR" +chown -R "$(logname)":"$(logname)" "$QL_DATA_DIR" + +# 🟢 如无 .env 则拷贝一份 +if [[ ! -f "$QL_DIR/.env" && -f "$QL_DIR/.env.example" ]]; then + cp "$QL_DIR/.env.example" "$QL_DIR/.env" + echo "▶ 已生成 $QL_DIR/.env" +fi + +echo "▶ 写入环境变量文件..." +cat >/etc/profile.d/qinglong.sh <>> QingLong env +export QL_DIR="$QL_DIR" +export QL_DATA_DIR="$QL_DATA_DIR" +export QL_PORT="$QL_PORT" +export PATH="\$PATH:$(npm root -g)/../.bin" +# <<< QingLong env +EOF +chmod 644 /etc/profile.d/qinglong.sh + +# 🟢 立刻生效 +. /etc/profile.d/qinglong.sh + +echo -e "\n✅ 安装完成!可立即运行: qinglong init" +echo " 如果要开机自启,再执行 qinglong boot 或写 systemd 均可。" diff --git a/main.py b/main.py new file mode 100644 index 0000000..5f58a92 --- /dev/null +++ b/main.py @@ -0,0 +1,796 @@ +from urllib.parse import quote +import argparse +import time +import uuid +import concurrent.futures +import requests +import datetime +from requests import RequestException +from DB import DBVidcon +from dateutil import parser as date_parser + +batch = str(int(time.time())) +db = DBVidcon() +proxies_address = { + "印度尼西亚": "ID", + "马来": "MY", + "加拿大": "CA", + "台湾": "TW", #"CN_city_TW", + "泰国": "TH", + "美国": "US", + "西班牙": "ES", + "韩国": "KR", + "香港": "HK", #"CN_city_HK", + "越南": "VN", +} +MACHINE_ID = None +MAX_WORKERS = 10 + + +def get_part_ids(part_num: int, take: int, offset: int = 0): + part_ids = list(range(offset, offset + take)) + if max(part_ids) >= part_num: + raise ValueError(f"分片编号超出范围,PART_IDS={part_ids} 超过 PART_NUM={part_num}") + next_offset = offset + take + if next_offset < part_num: + print(f"[提示] 下一台机器 offset 应该为: {next_offset}") + else: + print(f"[提示] 当前分片已经覆盖至末尾,无需更多机器") + return part_ids + + +def clean_dash_to_zero(val): + if val in ('-', '', None): + return 0 + try: + return int(val) + except (ValueError, TypeError) as e: + print(f"[字段异常] val = {val} → {str(e)}") + return 0 + + +def format_create_time(timestr): + try: + dt = date_parser.isoparse(timestr) + return dt.strftime("%Y-%m-%d %H:%M:%S") + except Exception as e: + print(f"[时间格式错误] {timestr} → {str(e)}") + return "1970-01-01 00:00:00" + + +def format_duration(seconds): + try: + seconds = int(seconds) + return f"{seconds // 60:02}:{seconds % 60:02}" + except Exception: + return "00:00" + + +headers1 = { + 'Accept': '*/*, */*', + # 'Accept-Encoding': 'gzip, deflate, br', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + # 'Content-Length': '6237', + 'Content-Type': 'application/json, application/json', + 'Host': 'graphql.api.dailymotion.com', + 'Origin': 'https://www.dailymotion.com', + 'Referer': 'https://www.dailymotion.com/', + 'Sec-Fetch-Dest': 'empty', + 'Sec-Fetch-Mode': 'cors', + 'Sec-Fetch-Site': 'same-site', + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36', + 'X-DM-AppInfo-Id': 'com.dailymotion.neon', + 'X-DM-AppInfo-Type': 'website', + 'X-DM-AppInfo-Version': 'v2025-04-28T12:37:52.391Z', + 'X-DM-Neon-SSR': '0', + 'X-DM-Preferred-Country': 'us', + 'accept-language': 'zh-CN', + 'authorization': 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhaWQiOiJmMWEzNjJkMjg4YzFiOTgwOTljNyIsInJvbCI6ImNhbi1tYW5hZ2UtcGFydG5lcnMtcmVwb3J0cyBjYW4tcmVhZC12aWRlby1zdHJlYW1zIGNhbi1zcG9vZi1jb3VudHJ5IGNhbi1hZG9wdC11c2VycyBjYW4tcmVhZC1jbGFpbS1ydWxlcyBjYW4tbWFuYWdlLWNsYWltLXJ1bGVzIGNhbi1tYW5hZ2UtdXNlci1hbmFseXRpY3MgY2FuLXJlYWQtbXktdmlkZW8tc3RyZWFtcyBjYW4tZG93bmxvYWQtbXktdmlkZW9zIGFjdC1hcyBhbGxzY29wZXMgYWNjb3VudC1jcmVhdG9yIGNhbi1yZWFkLWFwcGxpY2F0aW9ucyIsInNjbyI6InJlYWQgd3JpdGUgZGVsZXRlIGVtYWlsIHVzZXJpbmZvIGZlZWQgbWFuYWdlX3ZpZGVvcyBtYW5hZ2VfY29tbWVudHMgbWFuYWdlX3BsYXlsaXN0cyBtYW5hZ2VfdGlsZXMgbWFuYWdlX3N1YnNjcmlwdGlvbnMgbWFuYWdlX2ZyaWVuZHMgbWFuYWdlX2Zhdm9yaXRlcyBtYW5hZ2VfbGlrZXMgbWFuYWdlX2dyb3VwcyBtYW5hZ2VfcmVjb3JkcyBtYW5hZ2Vfc3VidGl0bGVzIG1hbmFnZV9mZWF0dXJlcyBtYW5hZ2VfaGlzdG9yeSBpZnR0dCByZWFkX2luc2lnaHRzIG1hbmFnZV9jbGFpbV9ydWxlcyBkZWxlZ2F0ZV9hY2NvdW50X21hbmFnZW1lbnQgbWFuYWdlX2FuYWx5dGljcyBtYW5hZ2VfcGxheWVyIG1hbmFnZV9wbGF5ZXJzIG1hbmFnZV91c2VyX3NldHRpbmdzIG1hbmFnZV9jb2xsZWN0aW9ucyBtYW5hZ2VfYXBwX2Nvbm5lY3Rpb25zIG1hbmFnZV9hcHBsaWNhdGlvbnMgbWFuYWdlX2RvbWFpbnMgbWFuYWdlX3BvZGNhc3RzIiwibHRvIjoiZVdGV1JTSkdXRVZjVGg0eEYyRWpWblFlTHdrdUhTVjVPMGdrWGciLCJhaW4iOjEsImFkZyI6MSwiaWF0IjoxNzQ2MjU3NzI1LCJleHAiOjE3NDYyOTM1NjgsImRtdiI6IjEiLCJhdHAiOiJicm93c2VyIiwiYWRhIjoid3d3LmRhaWx5bW90aW9uLmNvbSIsInZpZCI6IjY0NjMzRDAzMDY1RjQxODZBRDBCMDI3Q0Y3OTVFRjBGIiwiZnRzIjo5MTE0MSwiY2FkIjoyLCJjeHAiOjIsImNhdSI6Miwia2lkIjoiQUY4NDlERDczQTU4NjNDRDdEOTdEMEJBQjA3MjI0M0IifQ.bMzShOLIb6datC92qGPTRVCW9eINTYDFwLtqed2P1d4', + 'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"', + 'sec-ch-ua-mobile': '?0', + 'sec-ch-ua-platform': '"Windows"', + 'x-dm-visit-id': '1745971699160', + 'x-dm-visitor-id': '64633D03065F4186AD0B027CF795EF0F', +} + +Gproxies = None + + +def get_proxies(g): + url = "https://www.kookeey.com/pickdynamicips" + params = { + "auth": "pwd", + "format": "1", + "n": "1", + "p": "http", + "gate": "sea", + "g": g, + "r": "0", + "type": "json", + "sign": "10099426b05c7119e9c4dbd6a7a0aa4e", + "accessid": "2207189", + "dl": "," + } + try: + response = requests.get(url, params=params) + except RequestException: + return get_proxies(g) + try: + proxy_data = response.json()['data'][0] + except Exception: + print(g) + print("数据返回解析错误!"+ str(response.text)) + time.sleep(5) + return get_proxies(g) + proxies_url = f"http://{proxy_data['username']}:{proxy_data['password']}@{proxy_data['ip']}:{proxy_data['port']}" + proxies = { + "http": proxies_url, + "https": proxies_url, + } + return proxies + + +def post_with_retry(url, json_payload=None, data=None, headers=None, proxies=None, + retries=5, timeout=10, backoff_factor=2, verbose=True): + token_refreshed = False + for attempt in range(1, retries + 1): + try: + proxy_str = db.get_proxy(Gproxies) + proxies = {"http": proxy_str, "https": proxy_str} + + resp = requests.post( + url, + json=json_payload, + data=data, + headers=headers, + proxies=proxies, + timeout=timeout + ) + if resp.status_code == 401 and not token_refreshed: + if verbose: + print("[post_with_retry] 收到 401,刷新 token 后重试") + gettoken() + token_refreshed = True + continue + + resp.raise_for_status() + return resp + + except RequestException as e: + if verbose: + print(f"[{attempt}/{retries}] 请求失败: {e}") + # 如果还没刷新过 token,就刷新一次 + if not token_refreshed: + if verbose: + print("[post_with_retry] 刷新 token 后再试") + gettoken() + token_refreshed = True + continue + if attempt == retries: + if verbose: + print(f"[post_with_retry] 最终失败:{url}") + return None + + sleep_time = backoff_factor * (2 ** (attempt - 1)) + if verbose: + print(f"[post_with_retry] 等待 {sleep_time}s 后重试…") + time.sleep(sleep_time) + + +def gettoken(): + headers = { + 'host': 'graphql.api.dailymotion.com', + 'sec-ch-ua-platform': '"Windows"', + 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0', + 'sec-ch-ua': '"Chromium";v="136", "Microsoft Edge";v="136", "Not.A/Brand";v="99"', + 'content-type': 'application/x-www-form-urlencoded', + 'sec-ch-ua-mobile': '?0', + 'accept': '*/*', + 'origin': 'https://www.dailymotion.com', + 'sec-fetch-site': 'same-site', + 'sec-fetch-mode': 'cors', + 'sec-fetch-dest': 'empty', + 'referer': 'https://www.dailymotion.com/', + 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', + 'priority': 'u=1, i', + } + u = uuid.uuid4() + uuid_with_dash = str(u) + uuid_no_dash = u.hex + data = { + 'client_id': 'f1a362d288c1b98099c7', + 'client_secret': 'eea605b96e01c796ff369935357eca920c5da4c5', + 'grant_type': 'client_credentials', + 'traffic_segment': '244677', + 'visitor_id': uuid_with_dash, + } + try: + proxy_str = db.get_proxy(Gproxies) + url = 'https://graphql.api.dailymotion.com/oauth/token' + response = requests.post(url, headers=headers, data=data, proxies={"http": proxy_str, "https": proxy_str}) + token = response.json()['access_token'] + headers1['authorization'] = "Bearer " + token + headers1['x-dm-visit-id'] = str(int(time.time() * 1000)) + headers1['x-dm-visitor-id'] = uuid_no_dash + except Exception as e: + print(str(e)) + pass + + +def get_searchInfo(keyword): + video_list = [] + for j in range(1, 3): + # 别展开 = = ! + data = { + "operationName": "SEARCH_QUERY", + "variables": { + "query": keyword, + "shouldIncludeTopResults": True, + "shouldIncludeChannels": False, + "shouldIncludePlaylists": False, + "shouldIncludeHashtags": False, + "shouldIncludeVideos": False, + "shouldIncludeLives": False, + "page": j, + "limit": 100, + "recaptchaToken": None + }, + "query": """ + fragment VIDEO_BASE_FRAGMENT on Video { + id + xid + title + createdAt + duration + aspectRatio + thumbnail(height: PORTRAIT_240) { + id + url + __typename + } + creator { + id + xid + name + displayName + accountType + avatar(height: SQUARE_60) { + id + url + __typename + } + __typename + } + __typename + } + + fragment CHANNEL_BASE_FRAG on Channel { + id + xid + name + displayName + accountType + isFollowed + avatar(height: SQUARE_120) { + id + url + __typename + } + followerEngagement { + id + followDate + __typename + } + metrics { + id + engagement { + id + followers { + edges { + node { + id + total + __typename + } + __typename + } + __typename + } + __typename + } + __typename + } + __typename + } + + fragment PLAYLIST_BASE_FRAG on Collection { + id + xid + name + description + thumbnail(height: PORTRAIT_240) { + id + url + __typename + } + creator { + id + xid + name + displayName + accountType + avatar(height: SQUARE_60) { + id + url + __typename + } + __typename + } + metrics { + id + engagement { + id + videos(filter: {visibility: {eq: PUBLIC}}) { + edges { + node { + id + total + __typename + } + __typename + } + __typename + } + __typename + } + __typename + } + __typename + } + + fragment HASHTAG_BASE_FRAG on Hashtag { + id + xid + name + metrics { + id + engagement { + id + videos { + edges { + node { + id + total + __typename + } + __typename + } + __typename + } + __typename + } + __typename + } + __typename + } + + fragment LIVE_BASE_FRAGMENT on Live { + id + xid + title + audienceCount + aspectRatio + isOnAir + thumbnail(height: PORTRAIT_240) { + id + url + __typename + } + creator { + id + xid + name + displayName + accountType + avatar(height: SQUARE_60) { + id + url + __typename + } + __typename + } + __typename + } + + query SEARCH_QUERY($query: String!, $shouldIncludeTopResults: Boolean!, $shouldIncludeVideos: Boolean!, $shouldIncludeChannels: Boolean!, $shouldIncludePlaylists: Boolean!, $shouldIncludeHashtags: Boolean!, $shouldIncludeLives: Boolean!, $page: Int, $limit: Int, $sortByVideos: SearchVideoSort, $durationMinVideos: Int, $durationMaxVideos: Int, $createdAfterVideos: DateTime, $recaptchaToken: String) { + search(token: $recaptchaToken) { + id + stories(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeTopResults) { + metadata { + id + algorithm { + uuid + __typename + } + __typename + } + pageInfo { + hasNextPage + nextPage + __typename + } + edges { + node { + ...VIDEO_BASE_FRAGMENT + ...CHANNEL_BASE_FRAG + ...PLAYLIST_BASE_FRAG + ...HASHTAG_BASE_FRAG + ...LIVE_BASE_FRAGMENT + __typename + } + __typename + } + __typename + } + videos( + query: $query + first: $limit + page: $page + sort: $sortByVideos + durationMin: $durationMinVideos + durationMax: $durationMaxVideos + createdAfter: $createdAfterVideos + ) @include(if: $shouldIncludeVideos) { + metadata { + id + algorithm { + uuid + __typename + } + __typename + } + pageInfo { + hasNextPage + nextPage + __typename + } + edges { + node { + id + ...VIDEO_BASE_FRAGMENT + __typename + } + __typename + } + __typename + } + lives(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeLives) { + metadata { + id + algorithm { + uuid + __typename + } + __typename + } + pageInfo { + hasNextPage + nextPage + __typename + } + edges { + node { + id + ...LIVE_BASE_FRAGMENT + __typename + } + __typename + } + __typename + } + channels(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeChannels) { + metadata { + id + algorithm { + uuid + __typename + } + __typename + } + pageInfo { + hasNextPage + nextPage + __typename + } + edges { + node { + id + ...CHANNEL_BASE_FRAG + __typename + } + __typename + } + __typename + } + playlists: collections(query: $query, first: $limit, page: $page) @include(if: $shouldIncludePlaylists) { + metadata { + id + algorithm { + uuid + __typename + } + __typename + } + pageInfo { + hasNextPage + nextPage + __typename + } + edges { + node { + id + ...PLAYLIST_BASE_FRAG + __typename + } + __typename + } + __typename + } + hashtags(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeHashtags) { + metadata { + id + algorithm { + uuid + __typename + } + __typename + } + pageInfo { + hasNextPage + nextPage + __typename + } + edges { + node { + id + ...HASHTAG_BASE_FRAG + __typename + } + __typename + } + __typename + } + __typename + } + } + """ + } + gettoken() + response = post_with_retry( + "https://graphql.api.dailymotion.com/", + json_payload=data, + headers=headers1, + proxies=None + ) + + jsondata = response.json() + try: + resinfo = jsondata['data']['search']['stories']['edges'] + print('resinfo :', len(resinfo)) + except Exception: + resinfo = [] + print(response.text) + print("返回字段解析错误!") + video_tasks = [] + for index, iteminfo in enumerate(resinfo): + calculated_index = index + 1 + (j - 1) * 100 + node = iteminfo['node'] + if node['__typename'] != "Video": + continue + creator = node['creator'] + video_tasks.append({ + "index": calculated_index, + "xid": node.get('xid'), + "node": node, + "creator": creator, + }) + + def safe_fetch(task, max_try=2): + attempt = 0 + while attempt < max_try: + try: + return fetch_video_detail(task) + except Exception as e: + attempt += 1 + print(f"[线程异常] {task['xid']} 获取失败: {str(e)}") + + node = task["node"] + creator = task["creator"] + avatar = creator.get("avatar", {}) + return { + "index": task["index"], + "v_id": node.get("id"), + "v_xid": task["xid"], + "link": "https://www.dailymotion.com/video/" + task["xid"], + "title": node.get("title"), + "createtime": node.get("createdAt"), + "duration": node.get("duration"), + "pic": node.get("thumbnail", {}).get("url"), + "view": 0, + "fans": 0, + "videos": 0, + "u_id": creator.get('id'), + "u_xid": creator.get('xid'), + "u_name": creator.get('name'), + "u_pic": avatar.get('url'), + "_region": Gproxies + } + + with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + results = list(executor.map(safe_fetch, video_tasks)) + + for result in results: + if result: + video_list.append(result) + return video_list + + +def fetch_video_detail(task): + xid = task["xid"] + v_info = get_videoInfo(xid) + node = task["node"] + creator = task["creator"] + avatar = creator.get("avatar", {}) + return { + "index": task["index"], + "v_id": node.get("id"), + "v_xid": xid, + "link": "https://www.dailymotion.com/video/" + xid, + "title": node.get("title"), + "createtime": node.get("createdAt"), + "duration": node.get("duration"), + "pic": node.get("thumbnail", {}).get("url"), + "view": v_info['view'], + "fans": v_info['fans'], + "videos": v_info['videos'], + "u_id": creator.get('id'), + "u_xid": creator.get('xid'), + "u_name": creator.get('name'), + "u_pic": avatar.get('url'), + "_region": proxies_address + } + + +def get_videoInfo(x_id, r=3): + payload = { + "operationName": "WATCHING_VIDEO", + "variables": { + "xid": x_id, + "isSEO": False + }, + "query": "fragment VIDEO_FRAGMENT on Video {\n id\n xid\n isPublished\n duration\n title\n description\n thumbnailx60: thumbnailURL(size: \"x60\")\n thumbnailx120: thumbnailURL(size: \"x120\")\n thumbnailx240: thumbnailURL(size: \"x240\")\n thumbnailx360: thumbnailURL(size: \"x360\")\n thumbnailx480: thumbnailURL(size: \"x480\")\n thumbnailx720: thumbnailURL(size: \"x720\")\n thumbnailx1080: thumbnailURL(size: \"x1080\")\n aspectRatio\n category\n categories(filter: {category: {eq: CONTENT_CATEGORY}}) {\n edges {\n node { id name slug __typename }\n __typename\n }\n __typename\n }\n iab_categories: categories(\n filter: {category: {eq: IAB_CATEGORY}, percentage: {gte: 70}}\n ) {\n edges {\n node { id slug __typename }\n __typename\n }\n __typename\n }\n bestAvailableQuality\n createdAt\n viewerEngagement {\n id\n liked\n favorited\n __typename\n }\n isPrivate\n isWatched\n isCreatedForKids\n isExplicit\n canDisplayAds\n videoWidth: width\n videoHeight: height\n status\n hashtags {\n edges {\n node { id name __typename }\n __typename\n }\n __typename\n }\n stats {\n id\n views { id total __typename }\n __typename\n }\n channel {\n __typename\n id\n xid\n name\n displayName\n isArtist\n logoURLx25: logoURL(size: \"x25\")\n logoURL(size: \"x60\")\n isFollowed\n accountType\n coverURLx375: coverURL(size: \"x375\")\n stats {\n id\n views { id total __typename }\n followers { id total __typename }\n videos { id total __typename }\n __typename\n }\n country { id codeAlpha2 __typename }\n organization @skip(if: $isSEO) {\n id\n xid\n owner { id xid __typename }\n __typename\n }\n }\n language { id codeAlpha2 __typename }\n tags {\n edges {\n node { id label __typename }\n __typename\n }\n __typename\n }\n moderation { id reviewedAt __typename }\n topics(whitelistedOnly: true, first: 3, page: 1) {\n edges {\n node {\n id\n xid\n name\n names {\n edges {\n node {\n id\n name\n language { id codeAlpha2 __typename }\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n geoblockedCountries {\n id\n allowed\n denied\n __typename\n }\n transcript {\n edges {\n node { id timecode text __typename }\n __typename\n }\n __typename\n }\n __typename\n}\n\nfragment LIVE_FRAGMENT on Live {\n id\n xid\n startAt\n endAt\n isPublished\n title\n description\n thumbnailx60: thumbnailURL(size: \"x60\")\n thumbnailx120: thumbnailURL(size: \"x120\")\n thumbnailx240: thumbnailURL(size: \"x240\")\n thumbnailx360: thumbnailURL(size: \"x360\")\n thumbnailx480: thumbnailURL(size: \"x480\")\n thumbnailx720: thumbnailURL(size: \"x720\")\n thumbnailx1080: thumbnailURL(size: \"x1080\")\n aspectRatio\n category\n createdAt\n viewerEngagement { id liked favorited __typename }\n isPrivate\n isExplicit\n isCreatedForKids\n bestAvailableQuality\n canDisplayAds\n videoWidth: width\n videoHeight: height\n stats { id views { id total __typename } __typename }\n channel {\n __typename\n id\n xid\n name\n displayName\n isArtist\n logoURLx25: logoURL(size: \"x25\")\n logoURL(size: \"x60\")\n isFollowed\n accountType\n coverURLx375: coverURL(size: \"x375\")\n stats { id views { id total __typename } followers { id total __typename } videos { id total __typename } __typename }\n country { id codeAlpha2 __typename }\n organization @skip(if: $isSEO) { id xid owner { id xid __typename } __typename }\n }\n language { id codeAlpha2 __typename }\n tags { edges { node { id label __typename } __typename } __typename }\n moderation { id reviewedAt __typename }\n topics(whitelistedOnly: true, first: 3, page: 1) {\n edges { node { id xid name names { edges { node { id name language { id codeAlpha2 __typename } __typename } __typename } __typename } __typename } __typename }\n __typename\n }\n geoblockedCountries { id allowed denied __typename }\n __typename\n}\n\nquery WATCHING_VIDEO($xid: String!, $isSEO: Boolean!) {\n video: media(xid: $xid) {\n __typename\n ... on Video { id ...VIDEO_FRAGMENT __typename }\n ... on Live { id ...LIVE_FRAGMENT __typename }\n }\n}" + } + url = 'https://graphql.api.dailymotion.com/' + + response = post_with_retry( + url, + json_payload=payload, + headers=headers1, + proxies=None, + ) + jsondata = response.json() + try: + v_info = jsondata['data']['video']['channel']['stats'] + except Exception: + if r > 0: + return get_videoInfo(x_id=x_id, r=r - 1) + else: + return { + "view": '-', + "fans": '-', + "videos": '-', + } + return { + "view": v_info['views']['total'], + "fans": v_info['followers']['total'], + "videos": v_info['videos']['total'], + } + + +def integrate_data(): + while True: + keywords, flag = db.item_keyword() + if len(keywords) < 1: + time.sleep(30) + else: + for index, (payload, kitem) in enumerate(keywords): + try: + global Gproxies + Gproxies = proxies_address[kitem['rn']] + v_list = get_searchInfo(kitem['keyword']) + + if not v_list: + for i in range(3): + time.sleep(i * 5) + v_list = get_searchInfo(kitem["keyword"]) + if v_list: + break + time.sleep(2) + + + for item in v_list: + record = { + "keyword": kitem.get("keyword"), + "v_name" : kitem.get("v_name"), + "v_id": item.get("v_id"), + "v_xid": item.get("v_xid"), + "link": item.get("link"), + "title": item.get("title"), + "duration": format_duration(item.get("duration")), + "fans": clean_dash_to_zero(item.get("fans", 0)), + "videos": clean_dash_to_zero(item.get("videos", 0)), + "watch_number": clean_dash_to_zero(item.get("view", 0)), + "create_time": format_create_time(item.get("createtime")), + "cover_pic": item.get("pic"), + "index": item.get("index", 0), + "u_id": item.get("u_id"), + "u_xid": item.get("u_xid"), + "u_name": item.get("u_name"), + "u_pic": item.get("u_pic"), + "rn": kitem.get("rn"), + "batch": kitem['batch'], + "machine_id": MACHINE_ID, + "level": kitem['level'], + } + db.upsert_video(record) + db.flush() + except Exception as e: + print(f"[异常] {str(e.__class__.__name__)}: {str(e)}") + print(f"[异常] 处理关键词 {kitem['keyword']} 时发生错误,正在回滚...") + time.sleep(5) + remaining_payloads = [p for p, _ in keywords[index:]] + if flag == 2: + db.rollback(remaining_payloads) + elif flag == 1: + db.rollback_records(remaining_payloads) + time.sleep(5) + break + +def parse_args() -> argparse.Namespace: + global MACHINE_ID, MAX_WORKERS + + parser = argparse.ArgumentParser( + description="Configure worker settings." + ) + parser.add_argument( + "-m", "--machine-id", + type=int, + help=f"Machine identifier (default: {MACHINE_ID})" + ) + parser.add_argument( + "-w", "--max-workers", + type=int, + help=f"Maximum concurrent workers (default: {MAX_WORKERS})" + ) + + args = parser.parse_args() + + if args.machine_id is not None: + MACHINE_ID = args.machine_id + + if args.max_workers is not None: + if args.max_workers <= 0: + parser.error("--max-workers 不能是 0") + MAX_WORKERS = args.max_workers + if MACHINE_ID is None: + raise ValueError("请指定机器编号") + return args + +if __name__ == '__main__': + parse_args() + start_time = datetime.datetime.now() + print(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") + integrate_data() + end_time = datetime.datetime.now() + duration = end_time - start_time \ No newline at end of file diff --git a/multi_proxy_refill.py b/multi_proxy_refill.py new file mode 100644 index 0000000..e61972d --- /dev/null +++ b/multi_proxy_refill.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import time +import sys +import argparse +import redis +import requests +from requests import RequestException +from urllib.parse import quote + +# —— 固定 Redis 连接配置 —— # +_REDIS_CONF = { + "host": "127.0.0.1", + "port": 6379, + "password": "qwert@$123!&", + "decode_responses": True, +} + +# 阈值与批次参数 +QUEUE_PREFIX = "proxy_queue" +LOW_WATERMARK = 200 +REFILL_BATCH = 1000 +SLEEP_INTERVAL = 10 + +# 区域映射 +PROXIES_ADDRESS = { + "印度尼西亚": "ID", + "马来": "MY", + "加拿大": "CA", + "台湾": "TW", #"CN_city_TW", + "泰国": "TH", + "美国": "US", + "西班牙": "ES", + "韩国": "KR", + "香港": "HK", #"CN_city_HK", + "越南": "VN", +} + + +# 第三方 API 参数 +PROXY_API_URL = "https://www.kookeey.com/pickdynamicips" +ACCESS_ID = "2207189" +SIGN = "10099426b05c7119e9c4dbd6a7a0aa4e" + + +def fetch_proxies(region_name: str, n: int) -> list[str]: + """ + 从第三方一次性请求 n 条代理,返回格式化列表 + """ + params = { + "auth": "pwd", + "format": "1", + "n": str(n), + "p": "http", + "gate": "sea", + "g": region_name, + "r": "0", + "type": "json", + "sign": SIGN, + "accessid": ACCESS_ID, + "dl": ",", + } + try: + resp = requests.get(PROXY_API_URL, params=params, timeout=10) + resp.raise_for_status() + data = resp.json() + except (RequestException, ValueError): + time.sleep(1) + return fetch_proxies(region_name, n) + + arr = data.get("data") or [] + if not arr: + time.sleep(1) + return fetch_proxies(region_name, n) + + result = [] + for item in arr: + user = quote(item["username"], safe="") + pwd = quote(item["password"], safe="") + ip = item["ip"] + port = item["port"] + result.append(f"http://{user}:{pwd}@{ip}:{port}") + + return result + + +def refill_queue(r: redis.Redis, region_name: str, region_code: str, + low: int, batch: int): + """ + 如果 region 对应的队列长度 < low,就一次性补 batch 条 + """ + key = f"{QUEUE_PREFIX}:{region_code}" + length = r.llen(key) + if length >= low: + return + + to_fetch = batch - length + print(f"[{time.strftime('%H:%M:%S')}] {key} 长度 {length} < {low},一次性拉取 {to_fetch} 条…") + proxies = fetch_proxies(region_name, to_fetch) + if proxies: + r.rpush(key, *proxies) + print(f"[{time.strftime('%H:%M:%S')}] 已入队 {len(proxies)} 条 → 新长度 {r.llen(key)}") + else: + print(f"[{time.strftime('%H:%M:%S')}] 拉取失败,无数据入队") + + +def main(): + p = argparse.ArgumentParser(description="多地区代理队列自动补给") + p.add_argument("--low", "-l", type=int, default=LOW_WATERMARK, + help="低水位阈值,队列长度低于此值时触发补给") + p.add_argument("--batch", "-b", type=int, default=REFILL_BATCH, + help="单次补给目标条数") + p.add_argument("--sleep", "-s", type=int, default=SLEEP_INTERVAL, + help="检测间隔秒数") + args = p.parse_args() + + r = redis.Redis(**_REDIS_CONF) + print(f"[*] 启动补给,监控前缀 `{QUEUE_PREFIX}:` 列表,低于 {args.low} 条即补至 {args.batch} 条") + + while True: + try: + for region_name, region_code in PROXIES_ADDRESS.items(): + refill_queue(r, region_name, region_code, args.low, args.batch) + time.sleep(args.sleep) + except Exception as e: + msg = str(e).encode("utf-8", "ignore").decode() + print(f"[ERROR {time.strftime('%H:%M:%S')}] {msg}", file=sys.stderr) + time.sleep(args.sleep) + + +if __name__ == "__main__": + main() diff --git a/mysql_to_xlsx.py b/mysql_to_xlsx.py new file mode 100644 index 0000000..15581ba --- /dev/null +++ b/mysql_to_xlsx.py @@ -0,0 +1,89 @@ +import pymysql +import pandas as pd +from datetime import datetime + +# 数据库连接配置 +db_config = { + "host": "192.144.230.75", + "port": 3306, + "user": "db_vidcon", + "password": "rexdK4fhCCiRE4BZ", + "database": "db_vidcon", + "charset": "utf8mb4", +} + +def get_rn_list(): + """获取所有地区列表""" + sql = "SELECT DISTINCT rn FROM sh_dm_video_op_v2;" + conn = pymysql.connect(**db_config) + df = pd.read_sql(sql, conn) + conn.close() + return df['rn'].tolist() + +def get_data_for_rn(rn: str) -> pd.DataFrame: + """针对指定 rn 拉取数据""" + # 注意:这里把 SQL 中的 rn 和 level 参数化 + sql = f""" + SELECT + op.id AS ID, + v.v_name AS 片名, + v.link AS 视频连接, + v.is_piracy AS 是否盗版, + op.`level` AS 优先级, + op.rn AS 地区, + NULL AS 投诉日期, + NULL AS 下线日期, + op.keyword AS 关键词, + v.title AS 标题, + v.duration AS 时长, + v.watch_number AS 观看数量, + v.public_time AS 上传时间, + v.u_pic AS 头像, + CASE + WHEN dup.cnt > 1 THEN 1 + ELSE 2 + END AS 是否重复, + op.sort AS 排序, + op.batch AS 批次, + op.machine AS 机器号, + v.u_id AS 用户id, + v.u_xid AS u_xid, + v.u_name AS 用户名称 + FROM sh_dm_video_op_v2 AS op + LEFT JOIN ( + SELECT + t.v_xid, + COUNT(*) AS cnt + FROM sh_dm_video_op_v2 AS t + WHERE t.batch IN (1747324254, 1747323990) + GROUP BY t.v_xid + ) AS dup + ON op.v_xid = dup.v_xid + LEFT JOIN sh_dm_video_v2 AS v + ON op.v_xid = v.v_xid + WHERE op.rn = %s + AND op.batch IN (1747324254, 1747323990) + ORDER BY op.id; + """ + conn = pymysql.connect(**db_config) + df = pd.read_sql(sql, conn, params=(rn)) + conn.close() + return df + +def export_all(): + """循环所有地区,导出 Excel""" + rn_list = get_rn_list() + for rn in rn_list: + df = get_data_for_rn(rn) + if df.empty: + continue + + timestamp = datetime.now().strftime("%Y%m%d") + safe_rn = rn.replace(" ", "_") # 如果地区名里有空格或特殊字符 + filename = f"{timestamp}_T0T1_{safe_rn}.xlsx" + + df.to_excel(filename, index=False) + print(f"已导出:{filename}") + +if __name__ == "__main__": + export_all() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..37427bd --- /dev/null +++ b/requirements.txt @@ -0,0 +1,20 @@ +async-timeout==4.0.3 +certifi==2025.4.26 +charset-normalizer==3.4.2 +et-xmlfile==1.1.0 +idna==3.10 +importlib-metadata==6.7.0 +lxml==5.4.0 +numpy==1.21.6 +openpyxl==3.1.3 +pandas==1.3.5 +pkg_resources==0.0.0 +PyMySQL==1.1.1 +python-dateutil==2.9.0.post0 +pytz==2025.2 +redis==5.0.8 +requests==2.31.0 +six==1.17.0 +typing_extensions==4.7.1 +urllib3==2.0.7 +zipp==3.15.0