import argparse import base64 from datetime import datetime import concurrent.futures import requests import uuid import random import time import copy from threading import Lock from DB import DBVidcon, DBSA import json from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from dateutil import parser as date_parser MACHINE_ID = 0 db = DBVidcon() proxiesdict = db.get_proxy_agent_dict() class RetryRequests: def __init__( self, proxies: dict = None, timeout: int = 10, total: int = 3, backoff_factor: float = 1.0, status_forcelist: tuple = (500, 502, 503, 504), allowed_methods: tuple = ("GET", "POST"), ): self.session = requests.Session() self.timeout = timeout self.proxies = proxies retry = Retry( total=total, backoff_factor=backoff_factor, status_forcelist=status_forcelist, allowed_methods=allowed_methods, raise_on_status=False ) adapter = HTTPAdapter(max_retries=retry) self.session.mount("http://", adapter) self.session.mount("https://", adapter) def get(self, url, **kwargs): kwargs.setdefault("timeout", self.timeout) if self.proxies: kwargs.setdefault("proxies", self.proxies) return self.session.get(url, **kwargs) def post(self, url, **kwargs): kwargs.setdefault("timeout", self.timeout) if self.proxies: kwargs.setdefault("proxies", self.proxies) return self.session.post(url, **kwargs) req = RetryRequests() def clean_dash_to_zero(val): if val in ('-', '', None): return 0 try: return int(val) except (ValueError, TypeError) as e: print(f"[字段异常] val = {val} → {str(e)}") return 0 def format_create_time(timestr): try: dt = date_parser.isoparse(timestr) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: print(f"[时间格式错误] {timestr} → {str(e)}") return "1970-01-01 00:00:00" def format_duration(seconds): try: seconds = int(seconds) return f"{seconds // 60:02}:{seconds % 60:02}" except Exception: return "00:00" class DMHeaderManager: _headers_template = { 'Accept': '*/*, */*', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Type': 'application/json, application/json', 'Host': 'graphql.api.dailymotion.com', 'Origin': 'https://www.dailymotion.com', 'Referer': 'https://www.dailymotion.com/', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-site', 'User-Agent': 'Mozilla/5.0', 'X-DM-AppInfo-Id': 'com.dailymotion.neon', 'X-DM-AppInfo-Type': 'website', 'X-DM-AppInfo-Version': 'v2025-05-26T13:45:05.666Z', 'X-DM-Neon-SSR': '0', 'X-DM-Preferred-Country': 'tw', 'accept-language': 'zh-CN', 'authorization': '', 'sec-ch-ua': '"Chromium";v="128", "Not;A=Brand";v="24", "Google Chrome";v="128"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'x-dm-visit-id': '', 'x-dm-visitor-id': '', } _user_agents = [ 'Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50', 'Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1', 'Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11', 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36', ] def __init__(self, proxies: dict = None): self._headers_cache = None self._cache_lock = Lock() self._proxies = proxies def get_headers(self, retry: int = 2) -> dict: visitor_id = str(uuid.uuid4()) visit_id = str(int(time.time() * 1000)) traffic_segment = str(random.randint(100_000, 999_999)) ua = random.choice(self._user_agents) token_headers = { 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': 'https://www.dailymotion.com', 'Pragma': 'no-cache', 'Referer': 'https://www.dailymotion.com/', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-site', 'User-Agent': ua, 'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', } data = { 'client_id': 'f1a362d288c1b98099c7', 'client_secret': 'eea605b96e01c796ff369935357eca920c5da4c5', 'grant_type': 'client_credentials', 'traffic_segment': traffic_segment, 'visitor_id': visitor_id, } response = req.post( 'https://graphql.api.dailymotion.com/oauth/token', headers=token_headers, data=data, proxies=self._proxies, timeout=10 ) response.raise_for_status() token = response.json()['access_token'] new_headers = copy.deepcopy(self._headers_template) new_headers['authorization'] = f'Bearer {token}' new_headers['x-dm-visit-id'] = visit_id new_headers['x-dm-visitor-id'] = visitor_id new_headers['User-Agent'] = ua with self._cache_lock: self._headers_cache = copy.deepcopy(new_headers) return new_headers class DMVideoInfo: def __init__(self, proxies: dict = None, max_retries: int = 3, backoff_factor: float = 0.5): self.proxies = proxies self.max_retries = max_retries self.backoff_factor = backoff_factor def get_video_info(self, data: dict) -> dict: v_xid = data.get('v_xid') url = f'https://api.dailymotion.com/video/{v_xid}' params = { 'fields': 'id,title,created_time,thumbnail_240_url,duration,' 'owner.id,owner.screenname,likes_total,views_total,' 'owner.avatar_60_url,owner.followers_total,owner.videos_total' } try: resp = req.get(url, params=params, timeout=10) resp.raise_for_status() r_data = resp.json() xid = r_data["id"] vid = base64.b64encode(f"Video:{xid}".encode('utf-8')).decode('utf-8') uxid = r_data["owner.id"] uid = base64.b64encode(f"Channel:{uxid}".encode('utf-8')).decode('utf-8') duration = r_data.get("duration", 0) if duration < 30: return None data["v_id"] = vid data["title"] = r_data.get("title", "") data["link"] = "https://www.dailymotion.com/video/" + xid data["duration"] = format_duration(r_data.get("duration", 0)) data['create_time'] = format( datetime.fromtimestamp(r_data.get("created_time")).strftime("%Y-%m-%d %H:%M:%S")) data['fans'] = clean_dash_to_zero(r_data.get("owner.followers_total", 0)) data['videos'] = clean_dash_to_zero(r_data.get("owner.videos_total", 0)) data['watch_number'] = clean_dash_to_zero(r_data.get("views_total", 0)) data['cover_pic'] = r_data.get('thumbnail_240_url') data['u_id'] = uid data['u_xid'] = uxid data['u_name'] = r_data.get("owner.screenname", "") data['u_pic'] = r_data.get("owner.avatar_60_url", "") DBSA.upsert_video(data) DBSA.flush() except requests.RequestException as e: print(f"[ERROR] 请求失败 vxid={v_xid} : {e}") return None def parse_args() -> argparse.Namespace: global MACHINE_ID parser = argparse.ArgumentParser( description="Configure worker settings." ) parser.add_argument( "-m", "--machine-id", type=int, help=f"Machine identifier (default: {MACHINE_ID})" ) args = parser.parse_args() if args.machine_id is not None: MACHINE_ID = args.machine_id if MACHINE_ID is None or MACHINE_ID == 0: raise ValueError("请指定机器编号") return args def main(): while True: kwdata = db.get_web_items() if not kwdata: print("没有获取到关键词数据") time.sleep(30) return print(f"搜索关键词数据: {kwdata}") kwdata = kwdata[0][1] rn = kwdata['rn'] proxy_name = proxiesdict.get(rn) # proxies_str = "http://127.0.0.1:10808" proxies_str = db.get_proxy(proxy_name, '-1') proxies = { 'http': proxies_str, 'https': proxies_str } kw = kwdata['keyword'] dmheader_manager = DMHeaderManager(proxies=proxies) dmvideo_info = DMVideoInfo(proxies=proxies) headers = dmheader_manager.get_headers() for i in range(1, 11): data = { "operationName": "SEARCH_QUERY", "variables": { "query": kw, "shouldIncludeTopResults": True, # 是否包含热门结果 "shouldIncludeChannels": False, # 是否包含频道 "shouldIncludePlaylists": False, # 是否包含播放列表 "shouldIncludeHashtags": False, # 是否包含标签 "shouldIncludeVideos": False, # 是否包含视频 "shouldIncludeLives": False, # 是否包含直播 "page": i, "limit": 20, "recaptchaToken": None }, "query": """ fragment VIDEO_BASE_FRAGMENT on Video { id xid title createdAt duration aspectRatio thumbnail(height: PORTRAIT_240) { id url __typename } creator { id xid name displayName accountType avatar(height: SQUARE_60) { id url __typename } __typename } __typename } fragment CHANNEL_BASE_FRAG on Channel { id xid name displayName accountType isFollowed avatar(height: SQUARE_120) { id url __typename } followerEngagement { id followDate __typename } metrics { id engagement { id followers { edges { node { id total __typename } __typename } __typename } __typename } __typename } __typename } fragment PLAYLIST_BASE_FRAG on Collection { id xid name description thumbnail(height: PORTRAIT_240) { id url __typename } creator { id xid name displayName accountType avatar(height: SQUARE_60) { id url __typename } __typename } metrics { id engagement { id videos(filter: {visibility: {eq: PUBLIC}}) { edges { node { id total __typename } __typename } __typename } __typename } __typename } __typename } fragment HASHTAG_BASE_FRAG on Hashtag { id xid name metrics { id engagement { id videos { edges { node { id total __typename } __typename } __typename } __typename } __typename } __typename } fragment LIVE_BASE_FRAGMENT on Live { id xid title audienceCount aspectRatio isOnAir thumbnail(height: PORTRAIT_240) { id url __typename } creator { id xid name displayName accountType avatar(height: SQUARE_60) { id url __typename } __typename } __typename } query SEARCH_QUERY( $query: String!, $shouldIncludeTopResults: Boolean!, $shouldIncludeVideos: Boolean!, $shouldIncludeChannels: Boolean!, $shouldIncludePlaylists: Boolean!, $shouldIncludeHashtags: Boolean!, $shouldIncludeLives: Boolean!, $page: Int, $limit: Int, $sortByVideos: SearchVideoSort, $durationMinVideos: Int, $durationMaxVideos: Int, $createdAfterVideos: DateTime, $recaptchaToken: String ) { search(token: $recaptchaToken) { id stories(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeTopResults) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { ...VIDEO_BASE_FRAGMENT ...CHANNEL_BASE_FRAG ...PLAYLIST_BASE_FRAG ...HASHTAG_BASE_FRAG ...LIVE_BASE_FRAGMENT __typename } __typename } __typename } videos( query: $query, first: $limit, page: $page, sort: $sortByVideos, durationMin: $durationMinVideos, durationMax: $durationMaxVideos, createdAfter: $createdAfterVideos ) @include(if: $shouldIncludeVideos) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...VIDEO_BASE_FRAGMENT __typename } __typename } __typename } lives(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeLives) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...LIVE_BASE_FRAGMENT __typename } __typename } __typename } channels(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeChannels) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...CHANNEL_BASE_FRAG __typename } __typename } __typename } playlists: collections(query: $query, first: $limit, page: $page) @include(if: $shouldIncludePlaylists) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...PLAYLIST_BASE_FRAG __typename } __typename } __typename } hashtags(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeHashtags) { metadata { id algorithm { uuid __typename } __typename } pageInfo { hasNextPage nextPage __typename } edges { node { id ...HASHTAG_BASE_FRAG __typename } __typename } __typename } __typename } } """ } payload = json.dumps(data).encode() response = req.post('https://graphql.api.dailymotion.com/', headers=headers, data=payload, proxies=proxies) data = response.json() try: edges = data['data']['search']['stories']['edges'] except (TypeError,KeyError): print("stories 为 None 或结构异常,跳过") break edges_len = len(edges) print(f"第 {i} 页,关键词: {kw},获取到 {edges_len} 条数据") tancks = [] for j, edge in enumerate(edges): node = edge.get("node", {}) s_data = { "keyword": kw, "v_name": kwdata.get("v_name", ""), "v_xid": node.get("xid"), "batch": kwdata.get("batch"), "rn": kwdata.get("rn"), "machine_id": MACHINE_ID, "index": (i - 1) * 20 + j + 1, "level": 0, } tancks.append(s_data) with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: executor.map(dmvideo_info.get_video_info, tancks) if edges_len < 20: break time.sleep(5) time.sleep(10) if __name__ == '__main__': parse_args() start_time = datetime.now() print(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") main()