from urllib.parse import quote import argparse import time import uuid import concurrent.futures import requests import datetime from requests import RequestException from DB import DBVidcon from dateutil import parser as date_parser batch = str(int(time.time())) db = DBVidcon() MACHINE_ID = None MAX_WORKERS = 10 def get_part_ids(part_num: int, take: int, offset: int = 0): part_ids = list(range(offset, offset + take)) if max(part_ids) >= part_num: raise ValueError(f"分片编号超出范围,PART_IDS={part_ids} 超过 PART_NUM={part_num}") next_offset = offset + take if next_offset < part_num: print(f"[提示] 下一台机器 offset 应该为: {next_offset}") else: print(f"[提示] 当前分片已经覆盖至末尾,无需更多机器") return part_ids def clean_dash_to_zero(val): if val in ('-', '', None): return 0 try: return int(val) except (ValueError, TypeError) as e: print(f"[字段异常] val = {val} → {str(e)}") return 0 def format_create_time(timestr): try: dt = date_parser.isoparse(timestr) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: print(f"[时间格式错误] {timestr} → {str(e)}") return "1970-01-01 00:00:00" def format_duration(seconds): try: seconds = int(seconds) return f"{seconds // 60:02}:{seconds % 60:02}" except Exception: return "00:00" headers1 = { 'Accept': '*/*, */*', # 'Accept-Encoding': 'gzip, deflate, br', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', # 'Content-Length': '6237', 'Content-Type': 'application/json, application/json', 'Host': 'graphql.api.dailymotion.com', 'Origin': 'https://www.dailymotion.com', 'Referer': 'https://www.dailymotion.com/', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-site', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36', 'X-DM-AppInfo-Id': 'com.dailymotion.neon', 'X-DM-AppInfo-Type': 'website', 'X-DM-AppInfo-Version': 'v2025-04-28T12:37:52.391Z', 'X-DM-Neon-SSR': '0', 'X-DM-Preferred-Country': 'us', 'accept-language': 'zh-CN', 'authorization': 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhaWQiOiJmMWEzNjJkMjg4YzFiOTgwOTljNyIsInJvbCI6ImNhbi1tYW5hZ2UtcGFydG5lcnMtcmVwb3J0cyBjYW4tcmVhZC12aWRlby1zdHJlYW1zIGNhbi1zcG9vZi1jb3VudHJ5IGNhbi1hZG9wdC11c2VycyBjYW4tcmVhZC1jbGFpbS1ydWxlcyBjYW4tbWFuYWdlLWNsYWltLXJ1bGVzIGNhbi1tYW5hZ2UtdXNlci1hbmFseXRpY3MgY2FuLXJlYWQtbXktdmlkZW8tc3RyZWFtcyBjYW4tZG93bmxvYWQtbXktdmlkZW9zIGFjdC1hcyBhbGxzY29wZXMgYWNjb3VudC1jcmVhdG9yIGNhbi1yZWFkLWFwcGxpY2F0aW9ucyIsInNjbyI6InJlYWQgd3JpdGUgZGVsZXRlIGVtYWlsIHVzZXJpbmZvIGZlZWQgbWFuYWdlX3ZpZGVvcyBtYW5hZ2VfY29tbWVudHMgbWFuYWdlX3BsYXlsaXN0cyBtYW5hZ2VfdGlsZXMgbWFuYWdlX3N1YnNjcmlwdGlvbnMgbWFuYWdlX2ZyaWVuZHMgbWFuYWdlX2Zhdm9yaXRlcyBtYW5hZ2VfbGlrZXMgbWFuYWdlX2dyb3VwcyBtYW5hZ2VfcmVjb3JkcyBtYW5hZ2Vfc3VidGl0bGVzIG1hbmFnZV9mZWF0dXJlcyBtYW5hZ2VfaGlzdG9yeSBpZnR0dCByZWFkX2luc2lnaHRzIG1hbmFnZV9jbGFpbV9ydWxlcyBkZWxlZ2F0ZV9hY2NvdW50X21hbmFnZW1lbnQgbWFuYWdlX2FuYWx5dGljcyBtYW5hZ2VfcGxheWVyIG1hbmFnZV9wbGF5ZXJzIG1hbmFnZV91c2VyX3NldHRpbmdzIG1hbmFnZV9jb2xsZWN0aW9ucyBtYW5hZ2VfYXBwX2Nvbm5lY3Rpb25zIG1hbmFnZV9hcHBsaWNhdGlvbnMgbWFuYWdlX2RvbWFpbnMgbWFuYWdlX3BvZGNhc3RzIiwibHRvIjoiZVdGV1JTSkdXRVZjVGg0eEYyRWpWblFlTHdrdUhTVjVPMGdrWGciLCJhaW4iOjEsImFkZyI6MSwiaWF0IjoxNzQ2MjU3NzI1LCJleHAiOjE3NDYyOTM1NjgsImRtdiI6IjEiLCJhdHAiOiJicm93c2VyIiwiYWRhIjoid3d3LmRhaWx5bW90aW9uLmNvbSIsInZpZCI6IjY0NjMzRDAzMDY1RjQxODZBRDBCMDI3Q0Y3OTVFRjBGIiwiZnRzIjo5MTE0MSwiY2FkIjoyLCJjeHAiOjIsImNhdSI6Miwia2lkIjoiQUY4NDlERDczQTU4NjNDRDdEOTdEMEJBQjA3MjI0M0IifQ.bMzShOLIb6datC92qGPTRVCW9eINTYDFwLtqed2P1d4', 'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'x-dm-visit-id': '1745971699160', 'x-dm-visitor-id': '64633D03065F4186AD0B027CF795EF0F', } Gproxies = None def get_proxies(g): url = "https://www.kookeey.com/pickdynamicips" params = { "auth": "pwd", "format": "1", "n": "1", "p": "http", "gate": "sea", "g": g, "r": "0", "type": "json", "sign": "10099426b05c7119e9c4dbd6a7a0aa4e", "accessid": "2207189", "dl": "," } try: response = requests.get(url, params=params) except RequestException: return get_proxies(g) try: proxy_data = response.json()['data'][0] except Exception: print(g) print("数据返回解析错误!"+ str(response.text)) time.sleep(5) return get_proxies(g) proxies_url = f"http://{proxy_data['username']}:{proxy_data['password']}@{proxy_data['ip']}:{proxy_data['port']}" proxies = { "http": proxies_url, "https": proxies_url, } return proxies def post_with_retry(url, json_payload=None, data=None, headers=None, proxies=None, retries=5, timeout=10, backoff_factor=2, verbose=True): token_refreshed = False for attempt in range(1, retries + 1): try: proxy_str = db.get_proxy(Gproxies) proxies = {"http": proxy_str, "https": proxy_str} resp = requests.post( url, json=json_payload, data=data, headers=headers, proxies=proxies, timeout=timeout ) if resp.status_code == 401 and not token_refreshed: if verbose: print("[post_with_retry] 收到 401,刷新 token 后重试") gettoken() token_refreshed = True continue resp.raise_for_status() return resp except RequestException as e: if verbose: print(f"[{attempt}/{retries}] 请求失败: {e}") # 如果还没刷新过 token,就刷新一次 if not token_refreshed: if verbose: print("[post_with_retry] 刷新 token 后再试") gettoken() token_refreshed = True continue if attempt == retries: if verbose: print(f"[post_with_retry] 最终失败:{url}") return None sleep_time = backoff_factor * (2 ** (attempt - 1)) if verbose: print(f"[post_with_retry] 等待 {sleep_time}s 后重试…") time.sleep(sleep_time) def gettoken(): headers = { 'host': 'graphql.api.dailymotion.com', 'sec-ch-ua-platform': '"Windows"', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0', 'sec-ch-ua': '"Chromium";v="136", "Microsoft Edge";v="136", "Not.A/Brand";v="99"', 'content-type': 'application/x-www-form-urlencoded', 'sec-ch-ua-mobile': '?0', 'accept': '*/*', 'origin': 'https://www.dailymotion.com', 'sec-fetch-site': 'same-site', 'sec-fetch-mode': 'cors', 'sec-fetch-dest': 'empty', 'referer': 'https://www.dailymotion.com/', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'priority': 'u=1, i', } u = uuid.uuid4() uuid_with_dash = str(u) uuid_no_dash = u.hex data = { 'client_id': 'f1a362d288c1b98099c7', 'client_secret': 'eea605b96e01c796ff369935357eca920c5da4c5', 'grant_type': 'client_credentials', 'traffic_segment': '244677', 'visitor_id': uuid_with_dash, } try: proxy_str = db.get_proxy(Gproxies) url = 'https://graphql.api.dailymotion.com/oauth/token' response = requests.post(url, headers=headers, data=data, proxies={"http": proxy_str, "https": proxy_str}) token = response.json()['access_token'] headers1['authorization'] = "Bearer " + token headers1['x-dm-visit-id'] = str(int(time.time() * 1000)) headers1['x-dm-visitor-id'] = uuid_no_dash except Exception as e: print(str(e)) pass def get_searchInfo(keyword): video_list = [] for j in range(1, 3): # 别展开 = = ! data = { "operationName": "SEARCH_QUERY", "variables": { "query": keyword, "shouldIncludeTopResults": True, "shouldIncludeChannels": False, "shouldIncludePlaylists": False, "shouldIncludeHashtags": False, "shouldIncludeVideos": False, "shouldIncludeLives": False, "page": j, "limit": 100, "recaptchaToken": None }, "query": """ fragment VIDEO_BASE_FRAGMENT on Video { id xid title createdAt duration aspectRatio thumbnail(height: PORTRAIT_240) { id url __typename } creator { id xid name displayName accountType avatar(height: SQUARE_60) { id url __typename } __typename } __typename } fragment CHANNEL_BASE_FRAG on Channel { id xid name displayName accountType isFollowed avatar(height: SQUARE_120) { id url __typename } followerEngagement { id followDate __typename } metrics { id engagement { id followers { edges { node { id total __typename } __typename } __typename } __typename } __typename } __typename } fragment PLAYLIST_BASE_FRAG on Collection { id xid name description thumbnail(height: PORTRAIT_240) { id url __typename } creator { id xid name displayName accountType avatar(height: SQUARE_60) { id url __typename } __typename } metrics { id engagement { id videos(filter: {visibility: {eq: PUBLIC}}) { edges { node { id total __typename } __typename } __typename } __typename } __typename } __typename } fragment HASHTAG_BASE_FRAG on Hashtag { id xid name metrics { id engagement { id videos { edges { node { id total __typename } __typename } __typename } __typename } __typename } __typename } fragment LIVE_BASE_FRAGMENT on Live { id xid title audienceCount aspectRatio isOnAir thumbnail(height: PORTRAIT_240) { id url __typename } creator { id xid name displayName accountType avatar(height: SQUARE_60) { id url __typename } __typename } __typename } query SEARCH_QUERY($query: String!, $shouldIncludeTopResults: Boolean!, $shouldIncludeVideos: Boolean!, $shouldIncludeChannels: Boolean!, $shouldIncludePlaylists: Boolean!, $shouldIncludeHashtags: Boolean!, $shouldIncludeLives: Boolean!, $page: Int, $limit: Int, $sortByVideos: SearchVideoSort, $durationMinVideos: Int, $durationMaxVideos: Int, $createdAfterVideos: DateTime, $recaptchaToken: String) { search(token: $recaptchaToken) { id stories(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeTopResults) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { ...VIDEO_BASE_FRAGMENT ...CHANNEL_BASE_FRAG ...PLAYLIST_BASE_FRAG ...HASHTAG_BASE_FRAG ...LIVE_BASE_FRAGMENT __typename } __typename } __typename } videos( query: $query first: $limit page: $page sort: $sortByVideos durationMin: $durationMinVideos durationMax: $durationMaxVideos createdAfter: $createdAfterVideos ) @include(if: $shouldIncludeVideos) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...VIDEO_BASE_FRAGMENT __typename } __typename } __typename } lives(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeLives) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...LIVE_BASE_FRAGMENT __typename } __typename } __typename } channels(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeChannels) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...CHANNEL_BASE_FRAG __typename } __typename } __typename } playlists: collections(query: $query, first: $limit, page: $page) @include(if: $shouldIncludePlaylists) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...PLAYLIST_BASE_FRAG __typename } __typename } __typename } hashtags(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeHashtags) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...HASHTAG_BASE_FRAG __typename } __typename } __typename } __typename } } """ } gettoken() response = post_with_retry( "https://graphql.api.dailymotion.com/", json_payload=data, headers=headers1, proxies=None ) jsondata = response.json() try: resinfo = jsondata['data']['search']['stories']['edges'] print('resinfo :', len(resinfo)) except Exception: resinfo = [] print(response.text) print("返回字段解析错误!") video_tasks = [] for index, iteminfo in enumerate(resinfo): calculated_index = index + 1 + (j - 1) * 100 node = iteminfo['node'] if node['__typename'] != "Video": continue creator = node['creator'] video_tasks.append({ "index": calculated_index, "xid": node.get('xid'), "node": node, "creator": creator, }) def safe_fetch(task, max_try=2): attempt = 0 while attempt < max_try: try: return fetch_video_detail(task) except Exception as e: attempt += 1 print(f"[线程异常] {task['xid']} 获取失败: {str(e)}") node = task["node"] creator = task["creator"] avatar = creator.get("avatar", {}) return { "index": task["index"], "v_id": node.get("id"), "v_xid": task["xid"], "link": "https://www.dailymotion.com/video/" + task["xid"], "title": node.get("title"), "createtime": node.get("createdAt"), "duration": node.get("duration"), "pic": node.get("thumbnail", {}).get("url"), "view": 0, "fans": 0, "videos": 0, "u_id": creator.get('id'), "u_xid": creator.get('xid'), "u_name": creator.get('name'), "u_pic": avatar.get('url'), "_region": Gproxies } with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: results = list(executor.map(safe_fetch, video_tasks)) for result in results: if result: video_list.append(result) return video_list def fetch_video_detail(task): xid = task["xid"] v_info = get_videoInfo(xid) node = task["node"] creator = task["creator"] avatar = creator.get("avatar", {}) return { "index": task["index"], "v_id": node.get("id"), "v_xid": xid, "link": "https://www.dailymotion.com/video/" + xid, "title": node.get("title"), "createtime": node.get("createdAt"), "duration": node.get("duration"), "pic": node.get("thumbnail", {}).get("url"), "view": v_info['view'], "fans": v_info['fans'], "videos": v_info['videos'], "u_id": creator.get('id'), "u_xid": creator.get('xid'), "u_name": creator.get('name'), "u_pic": avatar.get('url') } def get_videoInfo(x_id, r=3): payload = { "operationName": "WATCHING_VIDEO", "variables": { "xid": x_id, "isSEO": False }, "query": "fragment VIDEO_FRAGMENT on Video {\n id\n xid\n isPublished\n duration\n title\n description\n thumbnailx60: thumbnailURL(size: \"x60\")\n thumbnailx120: thumbnailURL(size: \"x120\")\n thumbnailx240: thumbnailURL(size: \"x240\")\n thumbnailx360: thumbnailURL(size: \"x360\")\n thumbnailx480: thumbnailURL(size: \"x480\")\n thumbnailx720: thumbnailURL(size: \"x720\")\n thumbnailx1080: thumbnailURL(size: \"x1080\")\n aspectRatio\n category\n categories(filter: {category: {eq: CONTENT_CATEGORY}}) {\n edges {\n node { id name slug __typename }\n __typename\n }\n __typename\n }\n iab_categories: categories(\n filter: {category: {eq: IAB_CATEGORY}, percentage: {gte: 70}}\n ) {\n edges {\n node { id slug __typename }\n __typename\n }\n __typename\n }\n bestAvailableQuality\n createdAt\n viewerEngagement {\n id\n liked\n favorited\n __typename\n }\n isPrivate\n isWatched\n isCreatedForKids\n isExplicit\n canDisplayAds\n videoWidth: width\n videoHeight: height\n status\n hashtags {\n edges {\n node { id name __typename }\n __typename\n }\n __typename\n }\n stats {\n id\n views { id total __typename }\n __typename\n }\n channel {\n __typename\n id\n xid\n name\n displayName\n isArtist\n logoURLx25: logoURL(size: \"x25\")\n logoURL(size: \"x60\")\n isFollowed\n accountType\n coverURLx375: coverURL(size: \"x375\")\n stats {\n id\n views { id total __typename }\n followers { id total __typename }\n videos { id total __typename }\n __typename\n }\n country { id codeAlpha2 __typename }\n organization @skip(if: $isSEO) {\n id\n xid\n owner { id xid __typename }\n __typename\n }\n }\n language { id codeAlpha2 __typename }\n tags {\n edges {\n node { id label __typename }\n __typename\n }\n __typename\n }\n moderation { id reviewedAt __typename }\n topics(whitelistedOnly: true, first: 3, page: 1) {\n edges {\n node {\n id\n xid\n name\n names {\n edges {\n node {\n id\n name\n language { id codeAlpha2 __typename }\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n geoblockedCountries {\n id\n allowed\n denied\n __typename\n }\n transcript {\n edges {\n node { id timecode text __typename }\n __typename\n }\n __typename\n }\n __typename\n}\n\nfragment LIVE_FRAGMENT on Live {\n id\n xid\n startAt\n endAt\n isPublished\n title\n description\n thumbnailx60: thumbnailURL(size: \"x60\")\n thumbnailx120: thumbnailURL(size: \"x120\")\n thumbnailx240: thumbnailURL(size: \"x240\")\n thumbnailx360: thumbnailURL(size: \"x360\")\n thumbnailx480: thumbnailURL(size: \"x480\")\n thumbnailx720: thumbnailURL(size: \"x720\")\n thumbnailx1080: thumbnailURL(size: \"x1080\")\n aspectRatio\n category\n createdAt\n viewerEngagement { id liked favorited __typename }\n isPrivate\n isExplicit\n isCreatedForKids\n bestAvailableQuality\n canDisplayAds\n videoWidth: width\n videoHeight: height\n stats { id views { id total __typename } __typename }\n channel {\n __typename\n id\n xid\n name\n displayName\n isArtist\n logoURLx25: logoURL(size: \"x25\")\n logoURL(size: \"x60\")\n isFollowed\n accountType\n coverURLx375: coverURL(size: \"x375\")\n stats { id views { id total __typename } followers { id total __typename } videos { id total __typename } __typename }\n country { id codeAlpha2 __typename }\n organization @skip(if: $isSEO) { id xid owner { id xid __typename } __typename }\n }\n language { id codeAlpha2 __typename }\n tags { edges { node { id label __typename } __typename } __typename }\n moderation { id reviewedAt __typename }\n topics(whitelistedOnly: true, first: 3, page: 1) {\n edges { node { id xid name names { edges { node { id name language { id codeAlpha2 __typename } __typename } __typename } __typename } __typename } __typename }\n __typename\n }\n geoblockedCountries { id allowed denied __typename }\n __typename\n}\n\nquery WATCHING_VIDEO($xid: String!, $isSEO: Boolean!) {\n video: media(xid: $xid) {\n __typename\n ... on Video { id ...VIDEO_FRAGMENT __typename }\n ... on Live { id ...LIVE_FRAGMENT __typename }\n }\n}" } url = 'https://graphql.api.dailymotion.com/' response = post_with_retry( url, json_payload=payload, headers=headers1, proxies=None, ) jsondata = response.json() try: v_info = jsondata['data']['video']['channel']['stats'] except Exception: if r > 0: return get_videoInfo(x_id=x_id, r=r - 1) else: return { "view": '-', "fans": '-', "videos": '-', } return { "view": v_info['views']['total'], "fans": v_info['followers']['total'], "videos": v_info['videos']['total'], } def integrate_data(): while True: keywords, flag = db.item_keyword() if len(keywords) < 1: time.sleep(30) else: for index, (payload, kitem) in enumerate(keywords): try: global Gproxies Gproxies = kitem['rn'] v_list = get_searchInfo(kitem['keyword']) if not v_list: for i in range(3): time.sleep(i * 5) v_list = get_searchInfo(kitem["keyword"]) if v_list: break time.sleep(2) for item in v_list: record = { "keyword": kitem.get("keyword"), "v_name" : kitem.get("v_name"), "v_id": item.get("v_id"), "v_xid": item.get("v_xid"), "link": item.get("link"), "title": item.get("title"), "duration": format_duration(item.get("duration")), "fans": clean_dash_to_zero(item.get("fans", 0)), "videos": clean_dash_to_zero(item.get("videos", 0)), "watch_number": clean_dash_to_zero(item.get("view", 0)), "create_time": format_create_time(item.get("createtime")), "cover_pic": item.get("pic"), "index": item.get("index", 0), "u_id": item.get("u_id"), "u_xid": item.get("u_xid"), "u_name": item.get("u_name"), "u_pic": item.get("u_pic"), "rn": kitem.get("rn"), "batch": kitem['batch'], "machine_id": MACHINE_ID, "level": kitem['level'], } db.upsert_video(record) db.flush() except Exception as e: print(f"[异常] {str(e.__class__.__name__)}: {str(e)}") print(f"[异常] 处理关键词 {kitem['keyword']} 时发生错误,正在回滚...") time.sleep(5) remaining_payloads = [p for p, _ in keywords[index:]] if flag == 2: db.rollback(remaining_payloads) elif flag == 1: db.rollback_records(remaining_payloads) time.sleep(5) break def parse_args() -> argparse.Namespace: global MACHINE_ID, MAX_WORKERS parser = argparse.ArgumentParser( description="Configure worker settings." ) parser.add_argument( "-m", "--machine-id", type=int, help=f"Machine identifier (default: {MACHINE_ID})" ) parser.add_argument( "-w", "--max-workers", type=int, help=f"Maximum concurrent workers (default: {MAX_WORKERS})" ) args = parser.parse_args() if args.machine_id is not None: MACHINE_ID = args.machine_id if args.max_workers is not None: if args.max_workers <= 0: parser.error("--max-workers 不能是 0") MAX_WORKERS = args.max_workers if MACHINE_ID is None: raise ValueError("请指定机器编号") return args if __name__ == '__main__': parse_args() start_time = datetime.datetime.now() print(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") integrate_data() end_time = datetime.datetime.now() duration = end_time - start_time