import random from urllib.parse import quote import argparse import time import uuid import concurrent.futures import requests import datetime from requests import RequestException from DB import DBVidcon from dateutil import parser as date_parser batch = str(int(time.time())) db = DBVidcon() MACHINE_ID = None MAX_WORKERS = 10 def get_part_ids(part_num: int, take: int, offset: int = 0): part_ids = list(range(offset, offset + take)) if max(part_ids) >= part_num: raise ValueError(f"分片编号超出范围,PART_IDS={part_ids} 超过 PART_NUM={part_num}") next_offset = offset + take if next_offset < part_num: print(f"[提示] 下一台机器 offset 应该为: {next_offset}") else: print(f"[提示] 当前分片已经覆盖至末尾,无需更多机器") return part_ids def clean_dash_to_zero(val): if val in ('-', '', None): return 0 try: return int(val) except (ValueError, TypeError) as e: print(f"[字段异常] val = {val} → {str(e)}") return 0 def format_create_time(timestr): try: dt = date_parser.isoparse(timestr) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: print(f"[时间格式错误] {timestr} → {str(e)}") return "1970-01-01 00:00:00" def format_duration(seconds): try: seconds = int(seconds) return f"{seconds // 60:02}:{seconds % 60:02}" except Exception: return "00:00" headers1 = { 'Accept': '*/*, */*', # 'Accept-Encoding': 'gzip, deflate, br', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', # 'Content-Length': '6237', 'Content-Type': 'application/json, application/json', 'Host': 'graphql.api.dailymotion.com', 'Origin': 'https://www.dailymotion.com', 'Referer': 'https://www.dailymotion.com/', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-site', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36', 'X-DM-AppInfo-Id': 'com.dailymotion.neon', 'X-DM-AppInfo-Type': 'website', 'X-DM-AppInfo-Version': 'v2025-04-28T12:37:52.391Z', 'X-DM-Neon-SSR': '0', 'X-DM-Preferred-Country': 'us', 'accept-language': 'zh-CN', 'authorization': 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhaWQiOiJmMWEzNjJkMjg4YzFiOTgwOTljNyIsInJvbCI6ImNhbi1tYW5hZ2UtcGFydG5lcnMtcmVwb3J0cyBjYW4tcmVhZC12aWRlby1zdHJlYW1zIGNhbi1zcG9vZi1jb3VudHJ5IGNhbi1hZG9wdC11c2VycyBjYW4tcmVhZC1jbGFpbS1ydWxlcyBjYW4tbWFuYWdlLWNsYWltLXJ1bGVzIGNhbi1tYW5hZ2UtdXNlci1hbmFseXRpY3MgY2FuLXJlYWQtbXktdmlkZW8tc3RyZWFtcyBjYW4tZG93bmxvYWQtbXktdmlkZW9zIGFjdC1hcyBhbGxzY29wZXMgYWNjb3VudC1jcmVhdG9yIGNhbi1yZWFkLWFwcGxpY2F0aW9ucyIsInNjbyI6InJlYWQgd3JpdGUgZGVsZXRlIGVtYWlsIHVzZXJpbmZvIGZlZWQgbWFuYWdlX3ZpZGVvcyBtYW5hZ2VfY29tbWVudHMgbWFuYWdlX3BsYXlsaXN0cyBtYW5hZ2VfdGlsZXMgbWFuYWdlX3N1YnNjcmlwdGlvbnMgbWFuYWdlX2ZyaWVuZHMgbWFuYWdlX2Zhdm9yaXRlcyBtYW5hZ2VfbGlrZXMgbWFuYWdlX2dyb3VwcyBtYW5hZ2VfcmVjb3JkcyBtYW5hZ2Vfc3VidGl0bGVzIG1hbmFnZV9mZWF0dXJlcyBtYW5hZ2VfaGlzdG9yeSBpZnR0dCByZWFkX2luc2lnaHRzIG1hbmFnZV9jbGFpbV9ydWxlcyBkZWxlZ2F0ZV9hY2NvdW50X21hbmFnZW1lbnQgbWFuYWdlX2FuYWx5dGljcyBtYW5hZ2VfcGxheWVyIG1hbmFnZV9wbGF5ZXJzIG1hbmFnZV91c2VyX3NldHRpbmdzIG1hbmFnZV9jb2xsZWN0aW9ucyBtYW5hZ2VfYXBwX2Nvbm5lY3Rpb25zIG1hbmFnZV9hcHBsaWNhdGlvbnMgbWFuYWdlX2RvbWFpbnMgbWFuYWdlX3BvZGNhc3RzIiwibHRvIjoiZVdGV1JTSkdXRVZjVGg0eEYyRWpWblFlTHdrdUhTVjVPMGdrWGciLCJhaW4iOjEsImFkZyI6MSwiaWF0IjoxNzQ2MjU3NzI1LCJleHAiOjE3NDYyOTM1NjgsImRtdiI6IjEiLCJhdHAiOiJicm93c2VyIiwiYWRhIjoid3d3LmRhaWx5bW90aW9uLmNvbSIsInZpZCI6IjY0NjMzRDAzMDY1RjQxODZBRDBCMDI3Q0Y3OTVFRjBGIiwiZnRzIjo5MTE0MSwiY2FkIjoyLCJjeHAiOjIsImNhdSI6Miwia2lkIjoiQUY4NDlERDczQTU4NjNDRDdEOTdEMEJBQjA3MjI0M0IifQ.bMzShOLIb6datC92qGPTRVCW9eINTYDFwLtqed2P1d4', 'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'x-dm-visit-id': '1745971699160', 'x-dm-visitor-id': '64633D03065F4186AD0B027CF795EF0F', } Gproxies = None def get_proxies(g): url = "https://www.kookeey.com/pickdynamicips" params = { "auth": "pwd", "format": "1", "n": "1", "p": "http", "gate": "sea", "g": g, "r": "0", "type": "json", "sign": "10099426b05c7119e9c4dbd6a7a0aa4e", "accessid": "2207189", "dl": "," } try: response = requests.get(url, params=params) except RequestException: return get_proxies(g) try: proxy_data = response.json()['data'][0] except Exception: print(g) print("数据返回解析错误!" + str(response.text)) time.sleep(5) return get_proxies(g) proxies_url = f"http://{proxy_data['username']}:{proxy_data['password']}@{proxy_data['ip']}:{proxy_data['port']}" proxies = { "http": proxies_url, "https": proxies_url, } return proxies def post_with_retry(url, json_payload=None, data=None, headers=None, proxies=None, retries=5, timeout=10, backoff_factor=2, verbose=True): token_refreshed = False for attempt in range(1, retries + 1): try: proxy_str = db.get_proxy(Gproxies) proxies = {"http": proxy_str, "https": proxy_str} resp = requests.post( url, json=json_payload, data=data, headers=headers, proxies=proxies, timeout=timeout ) if resp.status_code == 401 and not token_refreshed: if verbose: print("[post_with_retry] 收到 401,刷新 token 后重试") gettoken() token_refreshed = True continue resp.raise_for_status() return resp except RequestException as e: if verbose: print(f"[{attempt}/{retries}] 请求失败: {e}") # 如果还没刷新过 token,就刷新一次 if not token_refreshed: if verbose: print("[post_with_retry] 刷新 token 后再试") gettoken() token_refreshed = True continue if attempt == retries: if verbose: print(f"[post_with_retry] 最终失败:{url}") return None sleep_time = backoff_factor * (2 ** (attempt - 1)) if verbose: print(f"[post_with_retry] 等待 {sleep_time}s 后重试…") time.sleep(sleep_time) def gettoken(): headers = { 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': 'https://www.dailymotion.com', 'Pragma': 'no-cache', 'Referer': 'https://www.dailymotion.com/', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-site', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36', 'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', } u = uuid.uuid4() uuid_with_dash = str(u) uuid_no_dash = u.hex traffic_segment = str(random.randint(100_000, 999_999)) data = { 'client_id': 'f1a362d288c1b98099c7', 'client_secret': 'eea605b96e01c796ff369935357eca920c5da4c5', 'grant_type': 'client_credentials', 'traffic_segment': traffic_segment, 'visitor_id': uuid_with_dash, } try: proxy_str = db.get_proxy(Gproxies) url = 'https://graphql.api.dailymotion.com/oauth/token' response = requests.post(url, headers=headers, data=data, proxies={"http": proxy_str, "https": proxy_str}) token = response.json()['access_token'] headers1['authorization'] = "Bearer " + token headers1['x-dm-visit-id'] = str(int(time.time() * 1000)) headers1['x-dm-visitor-id'] = uuid_no_dash except Exception as e: print(str(e)) pass def get_searchInfo(keyword, level): video_list = [] max_page = 2 limit = 30 if level == 0 or level == 1: max_page = 3 limit = 100 for j in range(1, max_page): # 别展开 = = ! data = { "operationName": "SEARCH_QUERY", "variables": { "query": keyword, "shouldIncludeTopResults": True, "shouldIncludeChannels": False, "shouldIncludePlaylists": False, "shouldIncludeHashtags": False, "shouldIncludeVideos": False, "shouldIncludeLives": False, "page": j, "limit": limit, "recaptchaToken": None }, "query": """ fragment VIDEO_BASE_FRAGMENT on Video { id xid title createdAt duration aspectRatio thumbnail(height: PORTRAIT_240) { id url __typename } creator { id xid name displayName accountType avatar(height: SQUARE_60) { id url __typename } __typename } __typename } fragment CHANNEL_BASE_FRAG on Channel { id xid name displayName accountType isFollowed avatar(height: SQUARE_120) { id url __typename } followerEngagement { id followDate __typename } metrics { id engagement { id followers { edges { node { id total __typename } __typename } __typename } __typename } __typename } __typename } fragment PLAYLIST_BASE_FRAG on Collection { id xid name description thumbnail(height: PORTRAIT_240) { id url __typename } creator { id xid name displayName accountType avatar(height: SQUARE_60) { id url __typename } __typename } metrics { id engagement { id videos(filter: {visibility: {eq: PUBLIC}}) { edges { node { id total __typename } __typename } __typename } __typename } __typename } __typename } fragment HASHTAG_BASE_FRAG on Hashtag { id xid name metrics { id engagement { id videos { edges { node { id total __typename } __typename } __typename } __typename } __typename } __typename } fragment LIVE_BASE_FRAGMENT on Live { id xid title audienceCount aspectRatio isOnAir thumbnail(height: PORTRAIT_240) { id url __typename } creator { id xid name displayName accountType avatar(height: SQUARE_60) { id url __typename } __typename } __typename } query SEARCH_QUERY($query: String!, $shouldIncludeTopResults: Boolean!, $shouldIncludeVideos: Boolean!, $shouldIncludeChannels: Boolean!, $shouldIncludePlaylists: Boolean!, $shouldIncludeHashtags: Boolean!, $shouldIncludeLives: Boolean!, $page: Int, $limit: Int, $sortByVideos: SearchVideoSort, $durationMinVideos: Int, $durationMaxVideos: Int, $createdAfterVideos: DateTime, $recaptchaToken: String) { search(token: $recaptchaToken) { id stories(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeTopResults) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { ...VIDEO_BASE_FRAGMENT ...CHANNEL_BASE_FRAG ...PLAYLIST_BASE_FRAG ...HASHTAG_BASE_FRAG ...LIVE_BASE_FRAGMENT __typename } __typename } __typename } videos( query: $query first: $limit page: $page sort: $sortByVideos durationMin: $durationMinVideos durationMax: $durationMaxVideos createdAfter: $createdAfterVideos ) @include(if: $shouldIncludeVideos) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...VIDEO_BASE_FRAGMENT __typename } __typename } __typename } lives(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeLives) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...LIVE_BASE_FRAGMENT __typename } __typename } __typename } channels(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeChannels) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...CHANNEL_BASE_FRAG __typename } __typename } __typename } playlists: collections(query: $query, first: $limit, page: $page) @include(if: $shouldIncludePlaylists) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...PLAYLIST_BASE_FRAG __typename } __typename } __typename } hashtags(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeHashtags) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...HASHTAG_BASE_FRAG __typename } __typename } __typename } __typename } } """ } response = post_with_retry( "https://graphql.api.dailymotion.com/", json_payload=data, headers=headers1, proxies=None ) jsondata = response.json() try: resinfo = jsondata['data']['search']['stories']['edges'] print('resinfo :', len(resinfo)) except Exception: resinfo = [] print("[搜索接口]", response.text) print("返回字段解析错误!") for index, iteminfo in enumerate(resinfo): calculated_index = index + 1 + (j - 1) * 100 node = iteminfo['node'] if node['__typename'] != "Video": continue creator = node['creator'] duration = node.get('duration') if duration <= 300: continue v_data = { "index": calculated_index, "v_id": node.get("id"), "v_xid": node.get('xid'), "link": "https://www.dailymotion.com/video/" + xid, "title": node.get("title"), "createtime": node.get("createdAt"), "duration": node.get("duration"), "pic": node.get("thumbnail", {}).get("url"), "view": 0, "fans": 0, "videos": 0, "u_id": creator.get('id'), "u_xid": creator.get('xid'), "u_name": creator.get('name'), "u_pic": node.get('thumbnail').get('url') } video_list.append(v_data) return video_list def integrate_data(): while True: keywords, flag = db.item_keyword() if len(keywords) < 1: time.sleep(30) else: for index, (payload, kitem) in enumerate(keywords): try: proxiesdict = db.get_proxy_agent_dict() global Gproxies Gproxies = proxiesdict[kitem['rn']] gettoken() v_list = get_searchInfo(kitem['keyword'], kitem['level']) if not v_list: for i in range(3): time.sleep(i * 5) v_list = get_searchInfo(kitem["keyword"], kitem['level']) if v_list: break time.sleep(2) for item in v_list: record = { "keyword": kitem.get("keyword"), "v_name": kitem.get("v_name"), "v_id": item.get("v_id"), "v_xid": item.get("v_xid"), "link": item.get("link"), "title": item.get("title"), "duration": format_duration(item.get("duration")), "fans": clean_dash_to_zero(item.get("fans", 0)), "videos": clean_dash_to_zero(item.get("videos", 0)), "watch_number": clean_dash_to_zero(item.get("view", 0)), "create_time": format_create_time(item.get("createtime")), "cover_pic": item.get("pic"), "index": item.get("index", 0), "u_id": item.get("u_id"), "u_xid": item.get("u_xid"), "u_name": item.get("u_name"), "u_pic": item.get("u_pic"), "rn": kitem.get("rn"), "batch": kitem['batch'], "machine_id": MACHINE_ID, "level": kitem['level'], } db.upsert_video(record) db.flush() except Exception as e: print(f"[异常] {str(e.__class__.__name__)}: {str(e)}") print(f"[异常] 处理关键词 {kitem['keyword']} 时发生错误,正在回滚...") time.sleep(5) remaining_payloads = [p for p, _ in keywords[index:]] if flag == 2: db.rollback(remaining_payloads) elif flag == 1: db.rollback_records(remaining_payloads) time.sleep(5) break def parse_args() -> argparse.Namespace: global MACHINE_ID, MAX_WORKERS parser = argparse.ArgumentParser( description="Configure worker settings." ) parser.add_argument( "-m", "--machine-id", type=int, help=f"Machine identifier (default: {MACHINE_ID})" ) parser.add_argument( "-w", "--max-workers", type=int, help=f"Maximum concurrent workers (default: {MAX_WORKERS})" ) args = parser.parse_args() if args.machine_id is not None: MACHINE_ID = args.machine_id if args.max_workers is not None: if args.max_workers <= 0: parser.error("--max-workers 不能是 0") MAX_WORKERS = args.max_workers if MACHINE_ID is None: raise ValueError("请指定机器编号") return args if __name__ == '__main__': parse_args() start_time = datetime.datetime.now() print(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") integrate_data() end_time = datetime.datetime.now() duration = end_time - start_time