diff --git a/onoe.py b/onoe.py new file mode 100644 index 0000000..7c47966 --- /dev/null +++ b/onoe.py @@ -0,0 +1,511 @@ +import json +import random +import traceback +from urllib.parse import quote,urlparse +import argparse +import time +import uuid +import concurrent.futures +import requests +import datetime +from requests import RequestException +from DB import DBVidcon, DBSA +from dateutil import parser as date_parser +import copy +from threading import Lock +from concurrent.futures import ThreadPoolExecutor, as_completed +from logger import logger +import os +import urllib3 + + +db = DBVidcon() +MACHINE_ID = None +MAX_WORKERS = 10 +executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) +UserAgent = [ + 'User-Agent,Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50', + 'User-Agent,Mozilla/5.0 (Windows NT 6.1; rv,2.0.1) Gecko/20100101 Firefox/4.0.1', + 'User-Agent,Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11', + 'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SE 2.X MetaSr 1.0; SE 2.X MetaSr 1.0; .NET CLR 2.0.50727; SE 2.X MetaSr 1.0)', + 'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36', + 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.2669.400 QQBrowser/9.6.10990.400', + 'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)', + 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Maxthon/5.0.3.4000 Chrome/47.0.2526.73 Safari/537.36', + 'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World)'] + + +def get_part_ids(part_num: int, take: int, offset: int = 0): + part_ids = list(range(offset, offset + take)) + if max(part_ids) >= part_num: + raise ValueError(f"分片编号超出范围,PART_IDS={part_ids} 超过 PART_NUM={part_num}") + next_offset = offset + take + if next_offset < part_num: + print(f"[提示] 下一台机器 offset 应该为: {next_offset}") + else: + print(f"[提示] 当前分片已经覆盖至末尾,无需更多机器") + return part_ids + + +def clean_dash_to_zero(val): + if val in ('-', '', None): + return 0 + try: + return int(val) + except (ValueError, TypeError) as e: + logger.exception(f"[字段异常] val = {val} → {str(e)}") + return 0 + + +def format_create_time(timestr): + try: + dt = date_parser.isoparse(timestr) + return dt.strftime("%Y-%m-%d %H:%M:%S") + except Exception as e: + logger.exception(f"[时间格式错误] {timestr} → {str(e)}") + return "1970-01-01 00:00:00" + + +def format_duration(seconds): + try: + seconds = int(seconds) + return f"{seconds // 60:02}:{seconds % 60:02}" + except Exception: + return "00:00" + + +headers1 = { + 'Accept': '*/*, */*', + # 'Accept-Encoding': 'gzip, deflate, br', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + # 'Content-Length': '8512', + 'Content-Type': 'application/json, application/json', + 'Host': 'graphql.api.dailymotion.com', + 'Origin': 'https://www.dailymotion.com', + 'Referer': 'https://www.dailymotion.com/', + 'Sec-Fetch-Dest': 'empty', + 'Sec-Fetch-Mode': 'cors', + 'Sec-Fetch-Site': 'same-site', + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36', + 'X-DM-AppInfo-Id': 'com.dailymotion.neon', + 'X-DM-AppInfo-Type': 'website', + 'X-DM-AppInfo-Version': 'v2025-05-26T13:45:05.666Z', + 'X-DM-Neon-SSR': '0', + 'X-DM-Preferred-Country': 'tw', + 'accept-language': 'zh-CN', + 'authorization': 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhaWQiOiJmMWEzNjJkMjg4YzFiOTgwOTljNyIsInJvbCI6ImNhbi1tYW5hZ2UtcGFydG5lcnMtcmVwb3J0cyBjYW4tcmVhZC12aWRlby1zdHJlYW1zIGNhbi1zcG9vZi1jb3VudHJ5IGNhbi1hZG9wdC11c2VycyBjYW4tcmVhZC1jbGFpbS1ydWxlcyBjYW4tbWFuYWdlLWNsYWltLXJ1bGVzIGNhbi1tYW5hZ2UtdXNlci1hbmFseXRpY3MgY2FuLXJlYWQtbXktdmlkZW8tc3RyZWFtcyBjYW4tZG93bmxvYWQtbXktdmlkZW9zIGFjdC1hcyBhbGxzY29wZXMgYWNjb3VudC1jcmVhdG9yIGNhbi1yZWFkLWFwcGxpY2F0aW9ucyIsInNjbyI6InJlYWQgd3JpdGUgZGVsZXRlIGVtYWlsIHVzZXJpbmZvIGZlZWQgbWFuYWdlX3ZpZGVvcyBtYW5hZ2VfY29tbWVudHMgbWFuYWdlX3BsYXlsaXN0cyBtYW5hZ2VfdGlsZXMgbWFuYWdlX3N1YnNjcmlwdGlvbnMgbWFuYWdlX2ZyaWVuZHMgbWFuYWdlX2Zhdm9yaXRlcyBtYW5hZ2VfbGlrZXMgbWFuYWdlX2dyb3VwcyBtYW5hZ2VfcmVjb3JkcyBtYW5hZ2Vfc3VidGl0bGVzIG1hbmFnZV9mZWF0dXJlcyBtYW5hZ2VfaGlzdG9yeSBpZnR0dCByZWFkX2luc2lnaHRzIG1hbmFnZV9jbGFpbV9ydWxlcyBkZWxlZ2F0ZV9hY2NvdW50X21hbmFnZW1lbnQgbWFuYWdlX2FuYWx5dGljcyBtYW5hZ2VfcGxheWVyIG1hbmFnZV9wbGF5ZXJzIG1hbmFnZV91c2VyX3NldHRpbmdzIG1hbmFnZV9jb2xsZWN0aW9ucyBtYW5hZ2VfYXBwX2Nvbm5lY3Rpb25zIG1hbmFnZV9hcHBsaWNhdGlvbnMgbWFuYWdlX2RvbWFpbnMgbWFuYWdlX3BvZGNhc3RzIiwibHRvIjoiY0c1Z1RocGRBbFIwVEVZeVhEVWNBMnNDTDFrUFFncDNRUTBNS3ciLCJhaW4iOjEsImFkZyI6MSwiaWF0IjoxNzQ4NTI0MDU5LCJleHAiOjE3NDg1NjAwMDcsImRtdiI6IjEiLCJhdHAiOiJicm93c2VyIiwiYWRhIjoid3d3LmRhaWx5bW90aW9uLmNvbSIsInZpZCI6IjY0NjMzRDAzMDY1RjQxODZBRDBCMDI3Q0Y3OTVFRjBGIiwiZnRzIjo5MTE0MSwiY2FkIjoyLCJjeHAiOjIsImNhdSI6Miwia2lkIjoiQUY4NDlERDczQTU4NjNDRDdEOTdEMEJBQjA3MjI0M0IifQ.h27sfMMETgt0xKhQvFAGIpwInouNj2sFLOeb1Y74Orc', + 'sec-ch-ua': '"Chromium";v="128", "Not;A=Brand";v="24", "Google Chrome";v="128"', + 'sec-ch-ua-mobile': '?0', + 'sec-ch-ua-platform': '"Windows"', + 'x-dm-visit-id': '1748480937099', + 'x-dm-visitor-id': '1032a5f1-d07f-4bef-b96d-7783939abfc9', +} +_headers_cache = None # 保存最近一次成功的 headers +_cache_lock = Lock() +Gproxies = None + + +def get_proxies(g): + url = "https://www.kookeey.com/pickdynamicips" + params = { + "auth": "pwd", + "format": "1", + "n": "1", + "p": "http", + "gate": "sea", + "g": g, + "r": "0", + "type": "json", + "sign": "10099426b05c7119e9c4dbd6a7a0aa4e", + "accessid": "2207189", + "dl": "," + } + try: + response = requests.get(url, params=params) + except RequestException: + return get_proxies(g) + try: + proxy_data = response.json()['data'][0] + except Exception: + logger.exception(g) + logger.exception("数据返回解析错误!" + str(response.text)) + time.sleep(5) + return get_proxies(g) + proxies_url = f"http://{proxy_data['username']}:{proxy_data['password']}@{proxy_data['ip']}:{proxy_data['port']}" + proxies = { + "http": proxies_url, + "https": proxies_url, + } + return proxies + + +def post_with_retry(url, proxy_name, json_payload=None, data=None, headers=None, + retries=5, timeout=10, backoff_factor=2, verbose=True): + token_refreshed = False + for attempt in range(1, retries + 1): + try: + proxy_str = db.get_proxy(proxy_name) + + proxies = {"http": proxy_str, "https": proxy_str} + + resp = requests.post( + url, + json=json_payload, + data=data, + headers=headers, + proxies=proxies, + timeout=timeout, + ) + if resp.status_code == 401 and not token_refreshed: + if verbose: + logger.info("[post_with_retry] 收到 401,刷新 token 后重试") + gettoken() + token_refreshed = True + continue + + resp.raise_for_status() + return resp + + except RequestException as e: + if verbose: + logger.info(f"[{attempt}/{retries}] 请求失败: {e}") + # 如果还没刷新过 token,就刷新一次 + if not token_refreshed: + if verbose: + logger.info("[post_with_retry] 刷新 token 后再试") + gettoken(proxy_name) + token_refreshed = True + continue + if attempt == retries: + if verbose: + logger.info(f"[post_with_retry] 最终失败:{url}") + return None + + sleep_time = backoff_factor * (2 ** (attempt - 1)) + if verbose: + logger.info(f"[post_with_retry] 等待 {sleep_time}s 后重试…") + time.sleep(sleep_time) + + +def gettoken(proxy, r=2): + global _headers_cache + headers = { + 'Accept': '*/*', + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'Content-Type': 'application/x-www-form-urlencoded', + 'Origin': 'https://www.dailymotion.com', + 'Pragma': 'no-cache', + 'Referer': 'https://www.dailymotion.com/', + 'Sec-Fetch-Dest': 'empty', + 'Sec-Fetch-Mode': 'cors', + 'Sec-Fetch-Site': 'same-site', + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36', + 'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"', + 'sec-ch-ua-mobile': '?0', + 'sec-ch-ua-platform': '"Windows"', + } + u = uuid.uuid4() + uuid_with_dash = str(u) + # uuid_no_dash = u.hex + traffic_segment = str(random.randint(100_000, 999_999)) + data = { + 'client_id': 'f1a362d288c1b98099c7', + 'client_secret': 'eea605b96e01c796ff369935357eca920c5da4c5', + 'grant_type': 'client_credentials', + 'traffic_segment': traffic_segment, + 'visitor_id': uuid_with_dash, + } + try: + proxy_str = db.get_proxy(proxy) + url = 'https://graphql.api.dailymotion.com/oauth/token' + response = requests.post(url, headers=headers, data=data, proxies={"http": proxy_str, "https": proxy_str}) + token = response.json()['access_token'] + copy_headers = copy.deepcopy(headers1) + copy_headers['authorization'] = "Bearer " + token + copy_headers['x-dm-visit-id'] = str(int(time.time() * 1000)) + copy_headers['x-dm-visitor-id'] = uuid_with_dash + copy_headers['User-Agent'] = UserAgent[random.randint(0, len(UserAgent) - 1)] + copy_headers['X-DM-Preferred-Country'] = proxy.lower() + with _cache_lock: + _headers_cache = copy_headers + return copy_headers + except Exception as e: + logger.exception("[gettoken] 失败:", e) + if r > 0: + time.sleep(5) + return gettoken(proxy, r - 1) + else: + with _cache_lock: + if _headers_cache: + logger.info("[gettoken] 用缓存 headers 兜底") + return copy.deepcopy(_headers_cache) + # 仍然没有 → 返回模板(没有 Auth) + return copy.deepcopy(headers1) + + +def solve_recaptcha_v3_with_proxy( + proxy_str: str, + keyword: str, + max_task_retries: int = 3, + polling_interval: float = 3, + max_poll_attempts: int = 5 + ) -> str: + parsed = urlparse(proxy_str) + + proxy_kwargs = { + "proxyType": parsed.scheme, + "proxyAddress": parsed.hostname, + "proxyPort": parsed.port + } + if parsed.username and parsed.password: + proxy_kwargs.update({ + "proxyLogin": parsed.username, + "proxyPassword": parsed.password + }) + + create_url = "https://api.capsolver.com/createTask" + result_url = "https://api.capsolver.com/getTaskResult" + headers = { + "Content-Type": "application/json", + "Accept": "application/json" + } + + last_error = None + for attempt in range(1, max_task_retries + 1): + try: + payload = { + "clientKey": "CAP-A76C932D4C6CCB3CA748F77FDC07D996", + "task": { + "type": "ReCaptchaV3Task", + "websiteURL": f"https://www.dailymotion.com/search/{keyword}/top-results", + "websiteKey": "6LeOJBIrAAAAAPMIjyYvo-eN_9W1HDOkrEqHR8tM", + "pageAction": "___grecaptcha_cfg.clients['100000']['L']['L']['promise-callback'](gRecaptchaResponse)", + "minScore": 0.3, + **proxy_kwargs + } + } + resp = requests.post(create_url, json=payload, headers=headers, timeout=30) + resp.raise_for_status() + task_id = resp.json()["taskId"] + + # 轮询获取结果 + check_payload = {"clientKey": "CAP-A76C932D4C6CCB3CA748F77FDC07D996", "taskId": task_id} + for _ in range(max_poll_attempts): + r = requests.post(result_url, json=check_payload, headers=headers, timeout=10) + r.raise_for_status() + result = r.json() + if result.get("status") == "ready": + return result["solution"]["token"] + time.sleep(polling_interval) + + raise TimeoutError(f"任务 {task_id} 在轮询 {max_poll_attempts} 次后未完成") + + except Exception as e: + last_error = e + if attempt < max_task_retries: + time.sleep(2) + continue + else: + break + + raise Exception(f"创建或获取 reCAPTCHA v3 token 失败: {last_error}") + + +def get_searchInfo(keyword, level, headers, proxy_name, r=2): + if r == 2: + logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}") + video_list = [] + max_page = 4 + limit = 10 + if level == 0 or level == 1: + max_page = 10 + limit = 20 + for j in range(1, max_page): + # 别展开 = = ! + proxy_str = db.get_proxy(proxy_name) + recaptchaToken = solve_recaptcha_v3_with_proxy(proxy_str, keyword) + data = ( + '{"operationName":"SEARCH_QUERY","variables":{"query":"%s","shouldIncludeTopResults":true,"shouldIncludeChannels":false,"shouldIncludePlaylists":false,"shouldIncludeHashtags":false,"shouldIncludeVideos":false,"shouldIncludeLives":false,"page":%d,"limit":%d,"recaptchaToken":"%s"},"query":"fragment VIDEO_BASE_FRAGMENT on Video {\\n id\\n xid\\n title\\n createdAt\\n duration\\n aspectRatio\\n thumbnail(height: PORTRAIT_240) {\\n id\\n url\\n __typename\\n }\\n creator {\\n id\\n xid\\n name\\n displayName\\n accountType\\n avatar(height: SQUARE_60) {\\n id\\n url\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nfragment CHANNEL_BASE_FRAG on Channel {\\n id\\n xid\\n name\\n displayName\\n accountType\\n isFollowed\\n avatar(height: SQUARE_120) {\\n id\\n url\\n __typename\\n }\\n followerEngagement {\\n id\\n followDate\\n __typename\\n }\\n metrics {\\n id\\n engagement {\\n id\\n followers {\\n edges {\\n node {\\n id\\n total\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nfragment PLAYLIST_BASE_FRAG on Collection {\\n id\\n xid\\n name\\n description\\n thumbnail(height: PORTRAIT_240) {\\n id\\n url\\n __typename\\n }\\n creator {\\n id\\n xid\\n name\\n displayName\\n accountType\\n avatar(height: SQUARE_60) {\\n id\\n url\\n __typename\\n }\\n __typename\\n }\\n metrics {\\n id\\n engagement {\\n id\\n videos(filter: {visibility: {eq: PUBLIC}}) {\\n edges {\\n node {\\n id\\n total\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nfragment HASHTAG_BASE_FRAG on Hashtag {\\n id\\n xid\\n name\\n metrics {\\n id\\n engagement {\\n id\\n videos {\\n edges {\\n node {\\n id\\n total\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nfragment LIVE_BASE_FRAGMENT on Live {\\n id\\n xid\\n title\\n audienceCount\\n aspectRatio\\n isOnAir\\n thumbnail(height: PORTRAIT_240) {\\n id\\n url\\n __typename\\n }\\n creator {\\n id\\n xid\\n name\\n displayName\\n accountType\\n avatar(height: SQUARE_60) {\\n id\\n url\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nquery SEARCH_QUERY($query: String!, $shouldIncludeTopResults: Boolean!, $shouldIncludeVideos: Boolean!, $shouldIncludeChannels: Boolean!, $shouldIncludePlaylists: Boolean!, $shouldIncludeHashtags: Boolean!, $shouldIncludeLives: Boolean!, $page: Int, $limit: Int, $sortByVideos: SearchVideoSort, $durationMinVideos: Int, $durationMaxVideos: Int, $createdAfterVideos: DateTime, $recaptchaToken: String) {\\n search(token: $recaptchaToken) {\\n id\\n stories(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeTopResults) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n ...VIDEO_BASE_FRAGMENT\\n ...CHANNEL_BASE_FRAG\\n ...PLAYLIST_BASE_FRAG\\n ...HASHTAG_BASE_FRAG\\n ...LIVE_BASE_FRAGMENT\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n videos(\\n query: $query\\n first: $limit\\n page: $page\\n sort: $sortByVideos\\n durationMin: $durationMinVideos\\n durationMax: $durationMaxVideos\\n createdAfter: $createdAfterVideos\\n ) @include(if: $shouldIncludeVideos) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...VIDEO_BASE_FRAGMENT\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n lives(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeLives) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...LIVE_BASE_FRAGMENT\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n channels(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeChannels) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...CHANNEL_BASE_FRAG\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n playlists: collections(query: $query, first: $limit, page: $page) @include(if: $shouldIncludePlaylists) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...PLAYLIST_BASE_FRAG\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n hashtags(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeHashtags) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...HASHTAG_BASE_FRAG\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n}\\n"}' % ( + keyword, + j, limit, recaptchaToken)).encode() + response = post_with_retry( + "https://graphql.api.dailymotion.com/", + data=data, + headers=headers, + proxy_name=proxy_name + ) + if response is None: + return None + jsondata = response.json() + try: + errors = jsondata.get("errors") # GraphQL errors 数组 + stories = jsondata.get("data", {}).get("search", {}).get("stories") + + if errors or stories is None: # 有错误 或 stories 为 null + if r == 0: + logger.info("连续 3 次错误或空结果:", json.dumps(jsondata, ensure_ascii=False)) + return None + time.sleep((3 - r) * 5) + return get_searchInfo(keyword, level, headers, proxy_name, r - 1) + resinfo = stories["edges"] + logger.info(f"resinfo: {len(resinfo)}") + except Exception: + if r < 0: + logger.exception("[搜索接口] 未知:未处理", response.text) + logger.exception("返回字段解析错误!") + return None + else: + time.sleep((3 - r) * 5) + return get_searchInfo(keyword, level, headers, proxy_name, r - 1) + for index, iteminfo in enumerate(resinfo): + calculated_index = index + 1 + (j - 1) * limit + node = iteminfo['node'] + if node['__typename'] != "Video": + continue + creator = node['creator'] + duration = node.get('duration') + if duration <= 300: + continue + v_data = { + "index": calculated_index, + "v_id": node.get("id"), + "v_xid": node.get('xid'), + "link": "https://www.dailymotion.com/video/" + node.get('xid'), + "title": node.get("title"), + "createtime": node.get("createdAt"), + "duration": node.get("duration"), + "pic": node.get("thumbnail", {}).get("url"), + "view": 0, + "fans": 0, + "videos": 0, + "u_id": creator.get('id'), + "u_xid": creator.get('xid'), + "u_name": creator.get('name'), + "u_pic": node.get('thumbnail').get('url') + } + video_list.append(v_data) + time.sleep(1) + return video_list + + +proxiesdict = db.get_proxy_agent_dict() + + +def search_worker(payload, kitem, flag): + try: + gproxies = proxiesdict[kitem['rn']] + header = gettoken(gproxies) + v_list = get_searchInfo(kitem['keyword'], kitem['level'], header, gproxies) + if not v_list: + for i in range(2): + time.sleep(i * 5) + v_list = get_searchInfo(kitem['keyword'], kitem['level'], header, gproxies) + if v_list: + break + time.sleep(2) + if not v_list: + v_list = [] + return True, flag, payload, kitem, v_list # 成功 + except Exception as e: + logger.exception(f"[线程异常] {kitem['keyword']} → {e}") + traceback.print_exc() + return False, flag, payload, kitem, [] # 失败 + + +executor = concurrent.futures.ThreadPoolExecutor(MAX_WORKERS) + + +def integrate_data_parallel(): + while True: + global proxiesdict + proxiesdict = db.get_proxy_agent_dict() + tasks, flag = db.item_keyword() + if not tasks: + time.sleep(10) + continue + + for payload, kitem in tasks: + proxname = proxiesdict.get(kitem['rn']) + logger.info(proxname) + h = gettoken(proxname) + logger.info(h) + v_list = get_searchInfo(kitem['keyword'], kitem['level'], h, proxiesdict) + + for item in v_list: + if not v_list: + continue + DBSA.upsert_video({ + "keyword": kitem["keyword"], + "v_name": kitem["v_name"], + "v_id": item["v_id"], + "v_xid": item["v_xid"], + "link": item["link"], + "title": item["title"], + "duration": format_duration(item["duration"]), + "fans": clean_dash_to_zero(item["fans"]), + "videos": clean_dash_to_zero(item["videos"]), + "watch_number": clean_dash_to_zero(item["view"]), + "create_time": format_create_time(item["createtime"]), + "cover_pic": item["pic"], + "index": item["index"], + "u_id": item["u_id"], + "u_xid": item["u_xid"], + "u_name": item["u_name"], + "u_pic": item["u_pic"], + "rn": kitem["rn"], + "batch": kitem["batch"], + "machine_id": MACHINE_ID, + "level": kitem["level"], + }) + DBSA.flush() + if rollback[0]: + db.rollback_l0(rollback[0]) + if rollback[1]: + db.rollback_l1(rollback[1]) + if rollback[2]: + db.rollback_l2(rollback[2]) + time.sleep(10) + +def parse_args() -> argparse.Namespace: + global MACHINE_ID, MAX_WORKERS + + parser = argparse.ArgumentParser( + description="Configure worker settings." + ) + parser.add_argument( + "-m", "--machine-id", + type=int, + help=f"Machine identifier (default: {MACHINE_ID})" + ) + parser.add_argument( + "-w", "--max-workers", + type=int, + help=f"Maximum concurrent workers (default: {MAX_WORKERS})" + ) + + args = parser.parse_args() + + if args.machine_id is not None: + MACHINE_ID = args.machine_id + + if args.max_workers is not None: + if args.max_workers <= 0: + parser.error("--max-workers 不能是 0") + MAX_WORKERS = args.max_workers + if MACHINE_ID is None: + raise ValueError("请指定机器编号") + return args + + +if __name__ == '__main__': + # parse_args() + start_time = datetime.datetime.now() + logger.info(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") + integrate_data_parallel() + end_time = datetime.datetime.now() + duration = end_time - start_time