import json import random import traceback from urllib.parse import quote, urlparse import argparse import time import uuid import concurrent.futures import requests import datetime from requests import RequestException from DB import DBVidcon, DBSA from dateutil import parser as date_parser import copy from threading import Lock from concurrent.futures import ThreadPoolExecutor, as_completed from logger import logger import os import urllib3 db = DBVidcon() MACHINE_ID = None MAX_WORKERS = 10 executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) UserAgent = [ 'User-Agent,Mozilla/5.0 (Windows; U; Windows NT 6.1; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50', 'User-Agent,Mozilla/5.0 (Windows NT 6.1; rv,2.0.1) Gecko/20100101 Firefox/4.0.1', 'User-Agent,Opera/9.80 (Windows NT 6.1; U; en) Presto/2.8.131 Version/11.11', 'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; SE 2.X MetaSr 1.0; SE 2.X MetaSr 1.0; .NET CLR 2.0.50727; SE 2.X MetaSr 1.0)', 'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; 360SE)', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.2669.400 QQBrowser/9.6.10990.400', 'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Maxthon/5.0.3.4000 Chrome/47.0.2526.73 Safari/537.36', 'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World)'] def get_part_ids(part_num: int, take: int, offset: int = 0): part_ids = list(range(offset, offset + take)) if max(part_ids) >= part_num: raise ValueError(f"分片编号超出范围,PART_IDS={part_ids} 超过 PART_NUM={part_num}") next_offset = offset + take if next_offset < part_num: print(f"[提示] 下一台机器 offset 应该为: {next_offset}") else: print(f"[提示] 当前分片已经覆盖至末尾,无需更多机器") return part_ids def clean_dash_to_zero(val): if val in ('-', '', None): return 0 try: return int(val) except (ValueError, TypeError) as e: logger.exception(f"[字段异常] val = {val} → {str(e)}") return 0 def format_create_time(timestr): try: dt = date_parser.isoparse(timestr) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: logger.exception(f"[时间格式错误] {timestr} → {str(e)}") return "1970-01-01 00:00:00" def format_duration(seconds): try: seconds = int(seconds) return f"{seconds // 60:02}:{seconds % 60:02}" except Exception: return "00:00" headers1 = { 'Accept': '*/*, */*', # 'Accept-Encoding': 'gzip, deflate, br', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', # 'Content-Length': '8512', 'Content-Type': 'application/json, application/json', 'Host': 'graphql.api.dailymotion.com', 'Origin': 'https://www.dailymotion.com', 'Referer': 'https://www.dailymotion.com/', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-site', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36', 'X-DM-AppInfo-Id': 'com.dailymotion.neon', 'X-DM-AppInfo-Type': 'website', 'X-DM-AppInfo-Version': 'v2025-05-26T13:45:05.666Z', 'X-DM-Neon-SSR': '0', 'X-DM-Preferred-Country': 'tw', 'accept-language': 'zh-CN', 'authorization': 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhaWQiOiJmMWEzNjJkMjg4YzFiOTgwOTljNyIsInJvbCI6ImNhbi1tYW5hZ2UtcGFydG5lcnMtcmVwb3J0cyBjYW4tcmVhZC12aWRlby1zdHJlYW1zIGNhbi1zcG9vZi1jb3VudHJ5IGNhbi1hZG9wdC11c2VycyBjYW4tcmVhZC1jbGFpbS1ydWxlcyBjYW4tbWFuYWdlLWNsYWltLXJ1bGVzIGNhbi1tYW5hZ2UtdXNlci1hbmFseXRpY3MgY2FuLXJlYWQtbXktdmlkZW8tc3RyZWFtcyBjYW4tZG93bmxvYWQtbXktdmlkZW9zIGFjdC1hcyBhbGxzY29wZXMgYWNjb3VudC1jcmVhdG9yIGNhbi1yZWFkLWFwcGxpY2F0aW9ucyIsInNjbyI6InJlYWQgd3JpdGUgZGVsZXRlIGVtYWlsIHVzZXJpbmZvIGZlZWQgbWFuYWdlX3ZpZGVvcyBtYW5hZ2VfY29tbWVudHMgbWFuYWdlX3BsYXlsaXN0cyBtYW5hZ2VfdGlsZXMgbWFuYWdlX3N1YnNjcmlwdGlvbnMgbWFuYWdlX2ZyaWVuZHMgbWFuYWdlX2Zhdm9yaXRlcyBtYW5hZ2VfbGlrZXMgbWFuYWdlX2dyb3VwcyBtYW5hZ2VfcmVjb3JkcyBtYW5hZ2Vfc3VidGl0bGVzIG1hbmFnZV9mZWF0dXJlcyBtYW5hZ2VfaGlzdG9yeSBpZnR0dCByZWFkX2luc2lnaHRzIG1hbmFnZV9jbGFpbV9ydWxlcyBkZWxlZ2F0ZV9hY2NvdW50X21hbmFnZW1lbnQgbWFuYWdlX2FuYWx5dGljcyBtYW5hZ2VfcGxheWVyIG1hbmFnZV9wbGF5ZXJzIG1hbmFnZV91c2VyX3NldHRpbmdzIG1hbmFnZV9jb2xsZWN0aW9ucyBtYW5hZ2VfYXBwX2Nvbm5lY3Rpb25zIG1hbmFnZV9hcHBsaWNhdGlvbnMgbWFuYWdlX2RvbWFpbnMgbWFuYWdlX3BvZGNhc3RzIiwibHRvIjoiY0c1Z1RocGRBbFIwVEVZeVhEVWNBMnNDTDFrUFFncDNRUTBNS3ciLCJhaW4iOjEsImFkZyI6MSwiaWF0IjoxNzQ4NTI0MDU5LCJleHAiOjE3NDg1NjAwMDcsImRtdiI6IjEiLCJhdHAiOiJicm93c2VyIiwiYWRhIjoid3d3LmRhaWx5bW90aW9uLmNvbSIsInZpZCI6IjY0NjMzRDAzMDY1RjQxODZBRDBCMDI3Q0Y3OTVFRjBGIiwiZnRzIjo5MTE0MSwiY2FkIjoyLCJjeHAiOjIsImNhdSI6Miwia2lkIjoiQUY4NDlERDczQTU4NjNDRDdEOTdEMEJBQjA3MjI0M0IifQ.h27sfMMETgt0xKhQvFAGIpwInouNj2sFLOeb1Y74Orc', 'sec-ch-ua': '"Chromium";v="128", "Not;A=Brand";v="24", "Google Chrome";v="128"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'x-dm-visit-id': '1748480937099', 'x-dm-visitor-id': '1032a5f1-d07f-4bef-b96d-7783939abfc9', } _headers_cache = None # 保存最近一次成功的 headers _cache_lock = Lock() Gproxies = None def get_proxies(g): url = "https://www.kookeey.com/pickdynamicips" params = { "auth": "pwd", "format": "1", "n": "1", "p": "http", "gate": "sea", "g": g, "r": "0", "type": "json", "sign": "10099426b05c7119e9c4dbd6a7a0aa4e", "accessid": "2207189", "dl": "," } try: response = requests.get(url, params=params) except RequestException: return get_proxies(g) try: proxy_data = response.json()['data'][0] except Exception: logger.exception(g) logger.exception("数据返回解析错误!" + str(response.text)) time.sleep(5) return get_proxies(g) proxies_url = f"http://{proxy_data['username']}:{proxy_data['password']}@{proxy_data['ip']}:{proxy_data['port']}" proxies = { "http": proxies_url, "https": proxies_url, } return proxies def post_with_retry(url, proxy_name, json_payload=None, data=None, headers=None, retries=5, timeout=10, backoff_factor=2, verbose=True): token_refreshed = False for attempt in range(1, retries + 1): try: proxy_str = db.get_proxy(proxy_name) proxies = {"http": proxy_str, "https": proxy_str} resp = requests.post( url, json=json_payload, data=data, headers=headers, proxies=proxies, timeout=timeout, ) if resp.status_code == 401 and not token_refreshed: if verbose: logger.info("[post_with_retry] 收到 401,刷新 token 后重试") gettoken() token_refreshed = True continue resp.raise_for_status() return resp except RequestException as e: if verbose: logger.info(f"[{attempt}/{retries}] 请求失败: {e}") # 如果还没刷新过 token,就刷新一次 if not token_refreshed: if verbose: logger.info("[post_with_retry] 刷新 token 后再试") gettoken(proxy_name) token_refreshed = True continue if attempt == retries: if verbose: logger.info(f"[post_with_retry] 最终失败:{url}") return None sleep_time = backoff_factor * (2 ** (attempt - 1)) if verbose: logger.info(f"[post_with_retry] 等待 {sleep_time}s 后重试…") time.sleep(sleep_time) def gettoken(proxy, r=2): global _headers_cache headers = { 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': 'https://www.dailymotion.com', 'Pragma': 'no-cache', 'Referer': 'https://www.dailymotion.com/', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-site', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36', 'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', } u = uuid.uuid4() uuid_with_dash = str(u) # uuid_no_dash = u.hex traffic_segment = str(random.randint(100_000, 999_999)) data = { 'client_id': 'f1a362d288c1b98099c7', 'client_secret': 'eea605b96e01c796ff369935357eca920c5da4c5', 'grant_type': 'client_credentials', 'traffic_segment': traffic_segment, 'visitor_id': uuid_with_dash, } try: proxy_str = db.get_proxy(proxy) url = 'https://graphql.api.dailymotion.com/oauth/token' response = requests.post(url, headers=headers, data=data, proxies={"http": proxy_str, "https": proxy_str}) token = response.json()['access_token'] copy_headers = copy.deepcopy(headers1) copy_headers['authorization'] = "Bearer " + token copy_headers['x-dm-visit-id'] = str(int(time.time() * 1000)) copy_headers['x-dm-visitor-id'] = uuid_with_dash copy_headers['User-Agent'] = UserAgent[random.randint(0, len(UserAgent) - 1)] copy_headers['X-DM-Preferred-Country'] = proxy.lower() with _cache_lock: _headers_cache = copy_headers return copy_headers except Exception as e: logger.exception("[gettoken] 失败:", e) if r > 0: time.sleep(5) return gettoken(proxy, r - 1) else: with _cache_lock: if _headers_cache: logger.info("[gettoken] 用缓存 headers 兜底") return copy.deepcopy(_headers_cache) # 仍然没有 → 返回模板(没有 Auth) return copy.deepcopy(headers1) def solve_recaptcha_v3_with_proxy( keyword: str, max_task_retries: int = 3, polling_interval: float = 3, max_poll_attempts: int = 10 ) -> str: create_url = "https://api.capsolver.com/createTask" result_url = "https://api.capsolver.com/getTaskResult" headers = { "Content-Type": "application/json", "Accept": "application/json" } last_error = None for attempt in range(1, max_task_retries + 1): try: encoded_query = quote(keyword, safe="") payload = { "clientKey": "CAP-A76C932D4C6CCB3CA748F77FDC07D996", "task": { "type": "ReCaptchaV3Task", "websiteURL": f"https://www.dailymotion.com/search/{encoded_query}/top-results", "websiteKey": "6LeOJBIrAAAAAPMIjyYvo-eN_9W1HDOkrEqHR8tM", "pageAction": "___grecaptcha_cfg.clients['100000']['L']['L']['promise-callback'](gRecaptchaResponse)", "minScore": 0.5 } } resp = requests.post(create_url, json=payload, headers=headers, timeout=30) logger.info(f"[token] 发送 payload:{payload}") resp.raise_for_status() task_id = resp.json()["taskId"] logger.info(f"task_id: {task_id}") # 轮询获取结果 check_payload = {"clientKey": "CAP-A76C932D4C6CCB3CA748F77FDC07D996", "taskId": task_id} for i in range(max_poll_attempts): r = requests.post(result_url, json=check_payload, headers=headers, timeout=10) r.raise_for_status() result = r.json() logger.info(f"第{i}次,task_id:{task_id},结果:{result}") if result.get("status") == "ready": return result["solution"]["token"] time.sleep(polling_interval) raise TimeoutError(f"任务 {task_id} 在轮询 {max_poll_attempts} 次后未完成") except Exception as e: last_error = e if attempt < max_task_retries: time.sleep(2) continue else: break raise Exception(f"创建或获取 reCAPTCHA v3 token 失败: {last_error}") def get_searchInfo(keyword, level, headers, proxy_name, r=2): if r == 2: logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}") video_list = [] max_page = 4 limit = 10 if level == 0 or level == 1: max_page = 10 limit = 20 for j in range(1, max_page): # 别展开 = = ! recaptchaToken = solve_recaptcha_v3_with_proxy(keyword) data = ( '{"operationName":"SEARCH_QUERY","variables":{"query":"%s","shouldIncludeTopResults":true,"shouldIncludeChannels":false,"shouldIncludePlaylists":false,"shouldIncludeHashtags":false,"shouldIncludeVideos":false,"shouldIncludeLives":false,"page":%d,"limit":%d,"recaptchaToken":"%s"},"query":"fragment VIDEO_BASE_FRAGMENT on Video {\\n id\\n xid\\n title\\n createdAt\\n duration\\n aspectRatio\\n thumbnail(height: PORTRAIT_240) {\\n id\\n url\\n __typename\\n }\\n creator {\\n id\\n xid\\n name\\n displayName\\n accountType\\n avatar(height: SQUARE_60) {\\n id\\n url\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nfragment CHANNEL_BASE_FRAG on Channel {\\n id\\n xid\\n name\\n displayName\\n accountType\\n isFollowed\\n avatar(height: SQUARE_120) {\\n id\\n url\\n __typename\\n }\\n followerEngagement {\\n id\\n followDate\\n __typename\\n }\\n metrics {\\n id\\n engagement {\\n id\\n followers {\\n edges {\\n node {\\n id\\n total\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nfragment PLAYLIST_BASE_FRAG on Collection {\\n id\\n xid\\n name\\n description\\n thumbnail(height: PORTRAIT_240) {\\n id\\n url\\n __typename\\n }\\n creator {\\n id\\n xid\\n name\\n displayName\\n accountType\\n avatar(height: SQUARE_60) {\\n id\\n url\\n __typename\\n }\\n __typename\\n }\\n metrics {\\n id\\n engagement {\\n id\\n videos(filter: {visibility: {eq: PUBLIC}}) {\\n edges {\\n node {\\n id\\n total\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nfragment HASHTAG_BASE_FRAG on Hashtag {\\n id\\n xid\\n name\\n metrics {\\n id\\n engagement {\\n id\\n videos {\\n edges {\\n node {\\n id\\n total\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nfragment LIVE_BASE_FRAGMENT on Live {\\n id\\n xid\\n title\\n audienceCount\\n aspectRatio\\n isOnAir\\n thumbnail(height: PORTRAIT_240) {\\n id\\n url\\n __typename\\n }\\n creator {\\n id\\n xid\\n name\\n displayName\\n accountType\\n avatar(height: SQUARE_60) {\\n id\\n url\\n __typename\\n }\\n __typename\\n }\\n __typename\\n}\\n\\nquery SEARCH_QUERY($query: String!, $shouldIncludeTopResults: Boolean!, $shouldIncludeVideos: Boolean!, $shouldIncludeChannels: Boolean!, $shouldIncludePlaylists: Boolean!, $shouldIncludeHashtags: Boolean!, $shouldIncludeLives: Boolean!, $page: Int, $limit: Int, $sortByVideos: SearchVideoSort, $durationMinVideos: Int, $durationMaxVideos: Int, $createdAfterVideos: DateTime, $recaptchaToken: String) {\\n search(token: $recaptchaToken) {\\n id\\n stories(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeTopResults) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n ...VIDEO_BASE_FRAGMENT\\n ...CHANNEL_BASE_FRAG\\n ...PLAYLIST_BASE_FRAG\\n ...HASHTAG_BASE_FRAG\\n ...LIVE_BASE_FRAGMENT\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n videos(\\n query: $query\\n first: $limit\\n page: $page\\n sort: $sortByVideos\\n durationMin: $durationMinVideos\\n durationMax: $durationMaxVideos\\n createdAfter: $createdAfterVideos\\n ) @include(if: $shouldIncludeVideos) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...VIDEO_BASE_FRAGMENT\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n lives(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeLives) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...LIVE_BASE_FRAGMENT\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n channels(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeChannels) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...CHANNEL_BASE_FRAG\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n playlists: collections(query: $query, first: $limit, page: $page) @include(if: $shouldIncludePlaylists) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...PLAYLIST_BASE_FRAG\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n hashtags(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeHashtags) {\\n metadata {\\n id\\n algorithm {\\n uuid\\n __typename\\n }\\n __typename\\n }\\n pageInfo {\\n hasNextPage\\n nextPage\\n __typename\\n }\\n edges {\\n node {\\n id\\n ...HASHTAG_BASE_FRAG\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n __typename\\n }\\n}\\n"}' % ( keyword, j, limit, recaptchaToken)).encode() response = post_with_retry( "https://graphql.api.dailymotion.com/", data=data, headers=headers, proxy_name=proxy_name ) if response is None: return None jsondata = response.json() try: errors = jsondata.get("errors") # GraphQL errors 数组 stories = jsondata.get("data", {}).get("search", {}).get("stories") if errors or stories is None: # 有错误 或 stories 为 null if r == 0: logger.info("连续 3 次错误或空结果:", json.dumps(jsondata, ensure_ascii=False)) return None time.sleep((3 - r) * 5) return get_searchInfo(keyword, level, headers, proxy_name, r - 1) resinfo = stories["edges"] logger.info(f"resinfo: {len(resinfo)}") except Exception: if r < 0: logger.exception("[搜索接口] 未知:未处理", response.text) logger.exception("返回字段解析错误!") return None else: time.sleep((3 - r) * 5) return get_searchInfo(keyword, level, headers, proxy_name, r - 1) for index, iteminfo in enumerate(resinfo): calculated_index = index + 1 + (j - 1) * limit node = iteminfo['node'] if node['__typename'] != "Video": continue creator = node['creator'] duration = node.get('duration') if duration <= 300: continue v_data = { "index": calculated_index, "v_id": node.get("id"), "v_xid": node.get('xid'), "link": "https://www.dailymotion.com/video/" + node.get('xid'), "title": node.get("title"), "createtime": node.get("createdAt"), "duration": node.get("duration"), "pic": node.get("thumbnail", {}).get("url"), "view": 0, "fans": 0, "videos": 0, "u_id": creator.get('id'), "u_xid": creator.get('xid'), "u_name": creator.get('name'), "u_pic": node.get('thumbnail').get('url') } video_list.append(v_data) time.sleep(1) return video_list proxiesdict = db.get_proxy_agent_dict() def search_worker(payload, kitem, flag): try: gproxies = proxiesdict[kitem['rn']] header = gettoken(gproxies) v_list = get_searchInfo(kitem['keyword'], kitem['level'], header, gproxies) if not v_list: for i in range(2): time.sleep(i * 5) v_list = get_searchInfo(kitem['keyword'], kitem['level'], header, gproxies) if v_list: break time.sleep(2) if not v_list: v_list = [] return True, flag, payload, kitem, v_list # 成功 except Exception as e: logger.exception(f"[线程异常] {kitem['keyword']} → {e}") traceback.print_exc() return False, flag, payload, kitem, [] # 失败 executor = concurrent.futures.ThreadPoolExecutor(MAX_WORKERS) def integrate_data_parallel(): while True: global proxiesdict proxiesdict = db.get_proxy_agent_dict() tasks, flag = db.item_keyword(1) if not tasks: time.sleep(10) continue for payload, kitem in tasks: proxname = proxiesdict.get(kitem['rn']) logger.info(proxname) h = gettoken(proxname) logger.info(h) v_list = get_searchInfo(kitem['keyword'], kitem['level'], h, proxname) for item in v_list: if not v_list: continue DBSA.upsert_video({ "keyword": kitem["keyword"], "v_name": kitem["v_name"], "v_id": item["v_id"], "v_xid": item["v_xid"], "link": item["link"], "title": item["title"], "duration": format_duration(item["duration"]), "fans": clean_dash_to_zero(item["fans"]), "videos": clean_dash_to_zero(item["videos"]), "watch_number": clean_dash_to_zero(item["view"]), "create_time": format_create_time(item["createtime"]), "cover_pic": item["pic"], "index": item["index"], "u_id": item["u_id"], "u_xid": item["u_xid"], "u_name": item["u_name"], "u_pic": item["u_pic"], "rn": kitem["rn"], "batch": kitem["batch"], "machine_id": MACHINE_ID, "level": kitem["level"], }) DBSA.flush() def parse_args() -> argparse.Namespace: global MACHINE_ID, MAX_WORKERS parser = argparse.ArgumentParser( description="Configure worker settings." ) parser.add_argument( "-m", "--machine-id", type=int, help=f"Machine identifier (default: {MACHINE_ID})" ) parser.add_argument( "-w", "--max-workers", type=int, help=f"Maximum concurrent workers (default: {MAX_WORKERS})" ) args = parser.parse_args() if args.machine_id is not None: MACHINE_ID = args.machine_id if args.max_workers is not None: if args.max_workers <= 0: parser.error("--max-workers 不能是 0") MAX_WORKERS = args.max_workers if MACHINE_ID is None: raise ValueError("请指定机器编号") return args if __name__ == '__main__': # parse_args() start_time = datetime.datetime.now() logger.info(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") integrate_data_parallel() end_time = datetime.datetime.now() duration = end_time - start_time