import base64 import json import random import traceback from urllib.parse import quote import argparse import time import uuid import concurrent.futures import requests import datetime from requests import RequestException from DB import DBVidcon, DBSA from dateutil import parser as date_parser import copy from threading import Lock from concurrent.futures import ThreadPoolExecutor, as_completed from logger import logger db = DBVidcon() MACHINE_ID = None MAX_WORKERS = 10 executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) def get_part_ids(part_num: int, take: int, offset: int = 0): part_ids = list(range(offset, offset + take)) if max(part_ids) >= part_num: raise ValueError(f"分片编号超出范围,PART_IDS={part_ids} 超过 PART_NUM={part_num}") next_offset = offset + take if next_offset < part_num: print(f"[提示] 下一台机器 offset 应该为: {next_offset}") else: print(f"[提示] 当前分片已经覆盖至末尾,无需更多机器") return part_ids def clean_dash_to_zero(val): if val in ('-', '', None): return 0 try: return int(val) except (ValueError, TypeError) as e: logger.exception(f"[字段异常] val = {val} → {str(e)}") return 0 def format_create_time(timestr): try: dt = date_parser.isoparse(timestr) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: logger.exception(f"[时间格式错误] {timestr} → {str(e)}") return "1970-01-01 00:00:00" def format_duration(seconds): try: seconds = int(seconds) return f"{seconds // 60:02}:{seconds % 60:02}" except Exception: return "00:00" def get_searchInfo(keyword, level, headers, proxy_name, r=2): if r == 2: logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}") video_list = [] max_page = 2 limit = 30 endpoint = 'https://api.dailymotion.com/videos' if level == 0 or level == 1: max_page = 3 limit = 100 for j in range(1, max_page): params = { 'search': keyword, 'fields': 'id,title,created_time,thumbnail_240_url,duration,owner.id,owner.screenname,likes_total,views_total,owner.avatar_60_url,owner.followers_total,owner.videos_total', 'limit': limit, 'page': j, 'sort': "relevance" } proxy_string = db.get_proxy(proxy_name) logger.info(f"代理: {proxy_string}") proxies = { 'http': proxy_string, 'https': proxy_string, } response = requests.get(endpoint, params=params, proxies=proxies) jsondata = response.json() try: resinfo = jsondata.get("list") except Exception: if r < 0: logger.exception("[搜索接口] 未知:未处理", response.text) logger.exception("返回字段解析错误!") return None else: time.sleep((3 - r) * 5) return get_searchInfo(keyword, level, headers, proxy_name, r - 1) for index, iteminfo in enumerate(resinfo): calculated_index = index + 1 + (j - 1) * limit xid = iteminfo["id"] vid = base64.b64encode(f"Video:{xid}".encode('utf-8')).decode('utf-8') uxid = iteminfo["owner"] uid = base64.b64encode(f"Channel:{uxid}".encode('utf-8')).decode('utf-8') duration = iteminfo.get('duration') if duration <= 300: continue v_data = { "index": calculated_index, "v_id": vid, "v_xid": xid, "link": "https://www.dailymotion.com/video/" + xid, "title": iteminfo.get("title"), "createtime": iteminfo.get('created_time'), "duration": iteminfo.get('duration'), "pic": iteminfo.get('thumbnail_240_url'), "view": iteminfo.get('views_total'), "fans": iteminfo.get('owner.followers_total'), "videos": iteminfo.get('owner.videos_total'), "u_id": uid, "u_xid": uxid, "u_name": iteminfo.get('owner.screenname'), "u_pic": iteminfo.get('owner.avatar_60_url') } video_list.append(v_data) time.sleep(1) return video_list proxiesdict = db.get_proxy_agent_dict() def search_worker(payload, kitem, flag): try: gproxies = proxiesdict[kitem['rn']] v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies) if not v_list: for i in range(2): time.sleep(i * 5) v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies) if v_list: break time.sleep(2) if not v_list: v_list = [] return True, flag, payload, kitem, v_list # 成功 except Exception as e: logger.exception(f"[线程异常] {kitem['keyword']} → {e}") traceback.print_exc() return False, flag, payload, kitem, [] # 失败 def integrate_data_parallel(): while True: global proxiesdict proxiesdict = db.get_proxy_agent_dict() tasks, flag = db.item_keyword() if not tasks: time.sleep(10) continue futures = [ executor.submit(search_worker, payload, kitem, flag) for payload, kitem in tasks ] # 统计回滚 rollback = {0: [], 1: [], 2: []} for fut in concurrent.futures.as_completed(futures): ok, f_flag, payload, kitem, v_list = fut.result() if not ok: rollback[f_flag].append(payload) continue for item in v_list: if not v_list: continue DBSA.upsert_video({ "keyword": kitem["keyword"], "v_name": kitem["v_name"], "v_id": item["v_id"], "v_xid": item["v_xid"], "link": item["link"], "title": item["title"], "duration": format_duration(item["duration"]), "fans": clean_dash_to_zero(item["fans"]), "videos": clean_dash_to_zero(item["videos"]), "watch_number": clean_dash_to_zero(item["view"]), "create_time": format_create_time(item["createtime"]), "cover_pic": item["pic"], "index": item["index"], "u_id": item["u_id"], "u_xid": item["u_xid"], "u_name": item["u_name"], "u_pic": item["u_pic"], "rn": kitem["rn"], "batch": kitem["batch"], "machine_id": MACHINE_ID, "level": kitem["level"], }) DBSA.flush() if rollback[0]: db.rollback_l0(rollback[0]) if rollback[1]: db.rollback_l1(rollback[1]) if rollback[2]: db.rollback_l2(rollback[2]) time.sleep(10) def parse_args() -> argparse.Namespace: global MACHINE_ID, MAX_WORKERS parser = argparse.ArgumentParser( description="Configure worker settings." ) parser.add_argument( "-m", "--machine-id", type=int, help=f"Machine identifier (default: {MACHINE_ID})" ) parser.add_argument( "-w", "--max-workers", type=int, help=f"Maximum concurrent workers (default: {MAX_WORKERS})" ) args = parser.parse_args() if args.machine_id is not None: MACHINE_ID = args.machine_id if args.max_workers is not None: if args.max_workers <= 0: parser.error("--max-workers 不能是 0") MAX_WORKERS = args.max_workers if MACHINE_ID is None: raise ValueError("请指定机器编号") return args if __name__ == '__main__': parse_args() executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) start_time = datetime.datetime.now() logger.info(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") integrate_data_parallel() end_time = datetime.datetime.now() duration = end_time - start_time