import json import random import traceback from urllib.parse import quote import argparse import time import uuid import concurrent.futures import requests import datetime from requests import RequestException from DB import DBVidcon, DBSA from dateutil import parser as date_parser import copy from threading import Lock from concurrent.futures import ThreadPoolExecutor, as_completed from logger import logger db = DBVidcon() MACHINE_ID = None MAX_WORKERS = 10 executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) def get_part_ids(part_num: int, take: int, offset: int = 0): part_ids = list(range(offset, offset + take)) if max(part_ids) >= part_num: raise ValueError(f"分片编号超出范围,PART_IDS={part_ids} 超过 PART_NUM={part_num}") next_offset = offset + take if next_offset < part_num: print(f"[提示] 下一台机器 offset 应该为: {next_offset}") else: print(f"[提示] 当前分片已经覆盖至末尾,无需更多机器") return part_ids def clean_dash_to_zero(val): if val in ('-', '', None): return 0 try: return int(val) except (ValueError, TypeError) as e: logger.exception(f"[字段异常] val = {val} → {str(e)}") return 0 def format_create_time(timestr): try: dt = date_parser.isoparse(timestr) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: logger.exception(f"[时间格式错误] {timestr} → {str(e)}") return "1970-01-01 00:00:00" def format_duration(seconds): try: seconds = int(seconds) return f"{seconds // 60:02}:{seconds % 60:02}" except Exception: return "00:00" headers1 = { 'Accept': '*/*, */*', # 'Accept-Encoding': 'gzip, deflate, br', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', # 'Content-Length': '6237', 'Content-Type': 'application/json, application/json', 'Host': 'graphql.api.dailymotion.com', 'Origin': 'https://www.dailymotion.com', 'Referer': 'https://www.dailymotion.com/', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-site', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36', 'X-DM-AppInfo-Id': 'com.dailymotion.neon', 'X-DM-AppInfo-Type': 'website', 'X-DM-AppInfo-Version': 'v2025-04-28T12:37:52.391Z', 'X-DM-Neon-SSR': '0', 'X-DM-Preferred-Country': 'us', 'accept-language': 'zh-CN', 'authorization': 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhaWQiOiJmMWEzNjJkMjg4YzFiOTgwOTljNyIsInJvbCI6ImNhbi1tYW5hZ2UtcGFydG5lcnMtcmVwb3J0cyBjYW4tcmVhZC12aWRlby1zdHJlYW1zIGNhbi1zcG9vZi1jb3VudHJ5IGNhbi1hZG9wdC11c2VycyBjYW4tcmVhZC1jbGFpbS1ydWxlcyBjYW4tbWFuYWdlLWNsYWltLXJ1bGVzIGNhbi1tYW5hZ2UtdXNlci1hbmFseXRpY3MgY2FuLXJlYWQtbXktdmlkZW8tc3RyZWFtcyBjYW4tZG93bmxvYWQtbXktdmlkZW9zIGFjdC1hcyBhbGxzY29wZXMgYWNjb3VudC1jcmVhdG9yIGNhbi1yZWFkLWFwcGxpY2F0aW9ucyIsInNjbyI6InJlYWQgd3JpdGUgZGVsZXRlIGVtYWlsIHVzZXJpbmZvIGZlZWQgbWFuYWdlX3ZpZGVvcyBtYW5hZ2VfY29tbWVudHMgbWFuYWdlX3BsYXlsaXN0cyBtYW5hZ2VfdGlsZXMgbWFuYWdlX3N1YnNjcmlwdGlvbnMgbWFuYWdlX2ZyaWVuZHMgbWFuYWdlX2Zhdm9yaXRlcyBtYW5hZ2VfbGlrZXMgbWFuYWdlX2dyb3VwcyBtYW5hZ2VfcmVjb3JkcyBtYW5hZ2Vfc3VidGl0bGVzIG1hbmFnZV9mZWF0dXJlcyBtYW5hZ2VfaGlzdG9yeSBpZnR0dCByZWFkX2luc2lnaHRzIG1hbmFnZV9jbGFpbV9ydWxlcyBkZWxlZ2F0ZV9hY2NvdW50X21hbmFnZW1lbnQgbWFuYWdlX2FuYWx5dGljcyBtYW5hZ2VfcGxheWVyIG1hbmFnZV9wbGF5ZXJzIG1hbmFnZV91c2VyX3NldHRpbmdzIG1hbmFnZV9jb2xsZWN0aW9ucyBtYW5hZ2VfYXBwX2Nvbm5lY3Rpb25zIG1hbmFnZV9hcHBsaWNhdGlvbnMgbWFuYWdlX2RvbWFpbnMgbWFuYWdlX3BvZGNhc3RzIiwibHRvIjoiZVdGV1JTSkdXRVZjVGg0eEYyRWpWblFlTHdrdUhTVjVPMGdrWGciLCJhaW4iOjEsImFkZyI6MSwiaWF0IjoxNzQ2MjU3NzI1LCJleHAiOjE3NDYyOTM1NjgsImRtdiI6IjEiLCJhdHAiOiJicm93c2VyIiwiYWRhIjoid3d3LmRhaWx5bW90aW9uLmNvbSIsInZpZCI6IjY0NjMzRDAzMDY1RjQxODZBRDBCMDI3Q0Y3OTVFRjBGIiwiZnRzIjo5MTE0MSwiY2FkIjoyLCJjeHAiOjIsImNhdSI6Miwia2lkIjoiQUY4NDlERDczQTU4NjNDRDdEOTdEMEJBQjA3MjI0M0IifQ.bMzShOLIb6datC92qGPTRVCW9eINTYDFwLtqed2P1d4', 'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'x-dm-visit-id': '1745971699160', 'x-dm-visitor-id': '64633D03065F4186AD0B027CF795EF0F', } _headers_cache = None # 保存最近一次成功的 headers _cache_lock = Lock() Gproxies = None def get_proxies(g): url = "https://www.kookeey.com/pickdynamicips" params = { "auth": "pwd", "format": "1", "n": "1", "p": "http", "gate": "sea", "g": g, "r": "0", "type": "json", "sign": "10099426b05c7119e9c4dbd6a7a0aa4e", "accessid": "2207189", "dl": "," } try: response = requests.get(url, params=params) except RequestException: return get_proxies(g) try: proxy_data = response.json()['data'][0] except Exception: logger.exception(g) logger.exception("数据返回解析错误!" + str(response.text)) time.sleep(5) return get_proxies(g) proxies_url = f"http://{proxy_data['username']}:{proxy_data['password']}@{proxy_data['ip']}:{proxy_data['port']}" proxies = { "http": proxies_url, "https": proxies_url, } return proxies def post_with_retry(url, proxy_name, json_payload=None, data=None, headers=None, retries=5, timeout=10, backoff_factor=2, verbose=True): token_refreshed = False for attempt in range(1, retries + 1): try: proxy_str = db.get_proxy(proxy_name) proxies = {"http": proxy_str, "https": proxy_str} resp = requests.post( url, json=json_payload, data=data, headers=headers, proxies=proxies, timeout=timeout ) if resp.status_code == 401 and not token_refreshed: if verbose: logger.info("[post_with_retry] 收到 401,刷新 token 后重试") gettoken() token_refreshed = True continue resp.raise_for_status() return resp except RequestException as e: if verbose: logger.info(f"[{attempt}/{retries}] 请求失败: {e}") # 如果还没刷新过 token,就刷新一次 if not token_refreshed: if verbose: logger.info("[post_with_retry] 刷新 token 后再试") gettoken(proxy_name) token_refreshed = True continue if attempt == retries: if verbose: logger.info(f"[post_with_retry] 最终失败:{url}") return None sleep_time = backoff_factor * (2 ** (attempt - 1)) if verbose: logger.info(f"[post_with_retry] 等待 {sleep_time}s 后重试…") time.sleep(sleep_time) def gettoken(proxy, r=2): global _headers_cache headers = { 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': 'https://www.dailymotion.com', 'Pragma': 'no-cache', 'Referer': 'https://www.dailymotion.com/', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-site', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36', 'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', } u = uuid.uuid4() uuid_with_dash = str(u) uuid_no_dash = u.hex traffic_segment = str(random.randint(100_000, 999_999)) data = { 'client_id': 'f1a362d288c1b98099c7', 'client_secret': 'eea605b96e01c796ff369935357eca920c5da4c5', 'grant_type': 'client_credentials', 'traffic_segment': traffic_segment, 'visitor_id': uuid_with_dash, } try: proxy_str = db.get_proxy(proxy) url = 'https://graphql.api.dailymotion.com/oauth/token' response = requests.post(url, headers=headers, data=data, proxies={"http": proxy_str, "https": proxy_str}) token = response.json()['access_token'] copy_headers = copy.deepcopy(headers1) copy_headers['authorization'] = "Bearer " + token copy_headers['x-dm-visit-id'] = str(int(time.time() * 1000)) copy_headers['x-dm-visitor-id'] = uuid_no_dash with _cache_lock: _headers_cache = copy_headers return copy_headers except Exception as e: logger.exception("[gettoken] 失败:", e) if r > 0: time.sleep(5) return gettoken(proxy, r - 1) else: with _cache_lock: if _headers_cache: logger.info("[gettoken] 用缓存 headers 兜底") return copy.deepcopy(_headers_cache) # 仍然没有 → 返回模板(没有 Auth) return copy.deepcopy(headers1) def get_searchInfo(keyword, level, headers, proxy_name, r=2): if r == 2: logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}") video_list = [] max_page = 2 limit = 30 if level == 0 or level == 1: max_page = 3 limit = 100 for j in range(1, max_page): # 别展开 = = ! data = { "operationName": "SEARCH_QUERY", "variables": { "query": keyword, "shouldIncludeTopResults": True, "shouldIncludeChannels": False, "shouldIncludePlaylists": False, "shouldIncludeHashtags": False, "shouldIncludeVideos": False, "shouldIncludeLives": False, "page": j, "limit": limit, "recaptchaToken": None }, "query": """ fragment VIDEO_BASE_FRAGMENT on Video { id xid title createdAt duration aspectRatio thumbnail(height: PORTRAIT_240) { id url __typename } creator { id xid name displayName accountType avatar(height: SQUARE_60) { id url __typename } __typename } __typename } fragment CHANNEL_BASE_FRAG on Channel { id xid name displayName accountType isFollowed avatar(height: SQUARE_120) { id url __typename } followerEngagement { id followDate __typename } metrics { id engagement { id followers { edges { node { id total __typename } __typename } __typename } __typename } __typename } __typename } fragment PLAYLIST_BASE_FRAG on Collection { id xid name description thumbnail(height: PORTRAIT_240) { id url __typename } creator { id xid name displayName accountType avatar(height: SQUARE_60) { id url __typename } __typename } metrics { id engagement { id videos(filter: {visibility: {eq: PUBLIC}}) { edges { node { id total __typename } __typename } __typename } __typename } __typename } __typename } fragment HASHTAG_BASE_FRAG on Hashtag { id xid name metrics { id engagement { id videos { edges { node { id total __typename } __typename } __typename } __typename } __typename } __typename } fragment LIVE_BASE_FRAGMENT on Live { id xid title audienceCount aspectRatio isOnAir thumbnail(height: PORTRAIT_240) { id url __typename } creator { id xid name displayName accountType avatar(height: SQUARE_60) { id url __typename } __typename } __typename } query SEARCH_QUERY($query: String!, $shouldIncludeTopResults: Boolean!, $shouldIncludeVideos: Boolean!, $shouldIncludeChannels: Boolean!, $shouldIncludePlaylists: Boolean!, $shouldIncludeHashtags: Boolean!, $shouldIncludeLives: Boolean!, $page: Int, $limit: Int, $sortByVideos: SearchVideoSort, $durationMinVideos: Int, $durationMaxVideos: Int, $createdAfterVideos: DateTime, $recaptchaToken: String) { search(token: $recaptchaToken) { id stories(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeTopResults) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { ...VIDEO_BASE_FRAGMENT ...CHANNEL_BASE_FRAG ...PLAYLIST_BASE_FRAG ...HASHTAG_BASE_FRAG ...LIVE_BASE_FRAGMENT __typename } __typename } __typename } videos( query: $query first: $limit page: $page sort: $sortByVideos durationMin: $durationMinVideos durationMax: $durationMaxVideos createdAfter: $createdAfterVideos ) @include(if: $shouldIncludeVideos) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...VIDEO_BASE_FRAGMENT __typename } __typename } __typename } lives(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeLives) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...LIVE_BASE_FRAGMENT __typename } __typename } __typename } channels(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeChannels) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...CHANNEL_BASE_FRAG __typename } __typename } __typename } playlists: collections(query: $query, first: $limit, page: $page) @include(if: $shouldIncludePlaylists) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...PLAYLIST_BASE_FRAG __typename } __typename } __typename } hashtags(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeHashtags) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...HASHTAG_BASE_FRAG __typename } __typename } __typename } __typename } } """ } response = post_with_retry( "https://graphql.api.dailymotion.com/", json_payload=data, headers=headers, proxy_name=proxy_name ) if response is None: return None jsondata = response.json() try: errors = jsondata.get("errors") # GraphQL errors 数组 stories = jsondata.get("data", {}).get("search", {}).get("stories") if errors or stories is None: # 有错误 或 stories 为 null if r == 0: logger.info("连续 3 次错误或空结果:", json.dumps(jsondata, ensure_ascii=False)) return None time.sleep((3 - r) * 5) return get_searchInfo(keyword, level, headers, proxy_name, r - 1) resinfo = stories["edges"] logger.info(f"resinfo: {len(resinfo)}") except Exception: if r < 0: logger.exception("[搜索接口] 未知:未处理", response.text) logger.exception("返回字段解析错误!") return None else: time.sleep((3 - r) * 5) return get_searchInfo(keyword, level, headers, proxy_name, r - 1) for index, iteminfo in enumerate(resinfo): calculated_index = index + 1 + (j - 1) * 100 node = iteminfo['node'] if node['__typename'] != "Video": continue creator = node['creator'] duration = node.get('duration') if duration <= 300: continue v_data = { "index": calculated_index, "v_id": node.get("id"), "v_xid": node.get('xid'), "link": "https://www.dailymotion.com/video/" + node.get('xid'), "title": node.get("title"), "createtime": node.get("createdAt"), "duration": node.get("duration"), "pic": node.get("thumbnail", {}).get("url"), "view": 0, "fans": 0, "videos": 0, "u_id": creator.get('id'), "u_xid": creator.get('xid'), "u_name": creator.get('name'), "u_pic": node.get('thumbnail').get('url') } video_list.append(v_data) return video_list proxiesdict = db.get_proxy_agent_dict() def search_worker(payload, kitem, flag): try: gproxies = proxiesdict[kitem['rn']] header = gettoken(gproxies) v_list = get_searchInfo(kitem['keyword'], kitem['level'], header, gproxies) if not v_list: for i in range(2): time.sleep(i * 5) v_list = get_searchInfo(kitem['keyword'], kitem['level'], header, gproxies) if v_list: break time.sleep(2) if not v_list: v_list = [] return True, flag, payload, kitem, v_list # 成功 except Exception as e: logger.exception(f"[线程异常] {kitem['keyword']} → {e}") traceback.print_exc() return False, flag, payload, kitem, [] # 失败 executor = concurrent.futures.ThreadPoolExecutor(MAX_WORKERS) def integrate_data_parallel(): while True: tasks, flag = db.item_keyword() if not tasks: time.sleep(10) continue futures = [ executor.submit(search_worker, payload, kitem, flag) for payload, kitem in tasks ] # 统计回滚 rollback = {0: [], 1: [], 2: []} for fut in concurrent.futures.as_completed(futures): ok, f_flag, payload, kitem, v_list = fut.result() if not ok: rollback[f_flag].append(payload) continue for item in v_list: if not v_list: continue DBSA.upsert_video({ "keyword": kitem["keyword"], "v_name": kitem["v_name"], "v_id": item["v_id"], "v_xid": item["v_xid"], "link": item["link"], "title": item["title"], "duration": format_duration(item["duration"]), "fans": clean_dash_to_zero(item["fans"]), "videos": clean_dash_to_zero(item["videos"]), "watch_number": clean_dash_to_zero(item["view"]), "create_time": format_create_time(item["createtime"]), "cover_pic": item["pic"], "index": item["index"], "u_id": item["u_id"], "u_xid": item["u_xid"], "u_name": item["u_name"], "u_pic": item["u_pic"], "rn": kitem["rn"], "batch": kitem["batch"], "machine_id": MACHINE_ID, "level": kitem["level"], }) DBSA.flush() if rollback[0]: db.rollback_l0(rollback[0]) if rollback[1]: db.rollback_l1(rollback[1]) if rollback[2]: db.rollback_l2(rollback[2]) def parse_args() -> argparse.Namespace: global MACHINE_ID, MAX_WORKERS parser = argparse.ArgumentParser( description="Configure worker settings." ) parser.add_argument( "-m", "--machine-id", type=int, help=f"Machine identifier (default: {MACHINE_ID})" ) parser.add_argument( "-w", "--max-workers", type=int, help=f"Maximum concurrent workers (default: {MAX_WORKERS})" ) args = parser.parse_args() if args.machine_id is not None: MACHINE_ID = args.machine_id if args.max_workers is not None: if args.max_workers <= 0: parser.error("--max-workers 不能是 0") MAX_WORKERS = args.max_workers if MACHINE_ID is None: raise ValueError("请指定机器编号") return args if __name__ == '__main__': parse_args() start_time = datetime.datetime.now() logger.info(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") integrate_data_parallel() end_time = datetime.datetime.now() duration = end_time - start_time