import base64 import traceback import argparse import time import concurrent.futures import requests from datetime import datetime from DB import DBVidcon, DBSA from dateutil import parser as date_parser from concurrent.futures import ThreadPoolExecutor from logger import logger db = DBVidcon() MACHINE_ID = None MAX_WORKERS = 10 executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) def get_part_ids(part_num: int, take: int, offset: int = 0): part_ids = list(range(offset, offset + take)) if max(part_ids) >= part_num: raise ValueError(f"分片编号超出范围,PART_IDS={part_ids} 超过 PART_NUM={part_num}") next_offset = offset + take if next_offset < part_num: print(f"[提示] 下一台机器 offset 应该为: {next_offset}") else: print(f"[提示] 当前分片已经覆盖至末尾,无需更多机器") return part_ids def clean_dash_to_zero(val): if val in ('-', '', None): return 0 try: return int(val) except (ValueError, TypeError) as e: logger.exception(f"[字段异常] val = {val} → {str(e)}") return 0 def format_create_time(timestr): try: dt = date_parser.isoparse(timestr) return dt.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: logger.exception(f"[时间格式错误] {timestr} → {str(e)}") return "1970-01-01 00:00:00" def format_duration(seconds): try: seconds = int(seconds) return f"{seconds // 60:02}:{seconds % 60:02}" except Exception: return "00:00" def get_searchInfo(keyword, level, headers, proxy_name, r=2): if r == 2: logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}") video_list = [] max_page = 2 limit = 30 endpoint = 'https://api.dailymotion.com/videos' if level == 0 or level == 1: max_page = 3 limit = 100 for j in range(1, max_page): params = { 'search': keyword, 'fields': 'id,title,created_time,thumbnail_240_url,duration,owner.id,owner.screenname,likes_total,views_total,owner.avatar_60_url,owner.followers_total,owner.videos_total', 'limit': limit, 'page': j, 'sort': "relevance" } proxy_string = db.get_proxy(proxy_name) logger.info(f"代理: {proxy_string}") proxies = { 'http': proxy_string, 'https': proxy_string, } response = requests.get(endpoint, params=params, proxies=proxies) jsondata = response.json() try: resinfo = jsondata.get("list") except Exception: if r < 0: logger.exception("[搜索接口] 未知:未处理", response.text) logger.exception("返回字段解析错误!") return None else: time.sleep((3 - r) * 5) return get_searchInfo(keyword, level, headers, proxy_name, r - 1) for index, iteminfo in enumerate(resinfo): calculated_index = index + 1 + (j - 1) * limit xid = iteminfo["id"] vid = base64.b64encode(f"Video:{xid}".encode('utf-8')).decode('utf-8') uxid = iteminfo["owner.id"] uid = base64.b64encode(f"Channel:{uxid}".encode('utf-8')).decode('utf-8') duration = iteminfo.get('duration') if duration <= 300: continue v_data = { "index": calculated_index, "v_id": vid, "v_xid": xid, "link": "https://www.dailymotion.com/video/" + xid, "title": iteminfo.get("title"), "createtime": datetime.fromtimestamp(iteminfo.get("created_time")).strftime("%Y-%m-%d %H:%M:%S"), "duration": iteminfo.get('duration'), "pic": iteminfo.get('thumbnail_240_url'), "view": iteminfo.get('views_total'), "fans": iteminfo.get('owner.followers_total'), "videos": iteminfo.get('owner.videos_total'), "u_id": uid, "u_xid": uxid, "u_name": iteminfo.get('owner.screenname'), "u_pic": iteminfo.get('owner.avatar_60_url') } print(v_data) video_list.append(v_data) time.sleep(1) return video_list proxiesdict = db.get_proxy_agent_dict() def search_worker(payload, kitem, flag): try: gproxies = proxiesdict[kitem['rn']] v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies) if not v_list: for i in range(2): time.sleep(i * 5) v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies) if v_list: break time.sleep(2) if not v_list: v_list = [] return True, flag, payload, kitem, v_list # 成功 except Exception as e: logger.exception(f"[线程异常] {kitem['keyword']} → {e}") traceback.print_exc() return False, flag, payload, kitem, [] # 失败 def integrate_data_parallel(): while True: global proxiesdict proxiesdict = db.get_proxy_agent_dict() tasks, flag = db.item_keyword() if not tasks: time.sleep(10) continue futures = [ executor.submit(search_worker, payload, kitem, flag) for payload, kitem in tasks ] # 统计回滚 rollback = {0: [], 1: [], 2: []} for fut in concurrent.futures.as_completed(futures): ok, f_flag, payload, kitem, v_list = fut.result() if not ok: rollback[f_flag].append(payload) continue for item in v_list: if not v_list: continue DBSA.upsert_video({ "keyword": kitem["keyword"], "v_name": kitem["v_name"], "v_id": item["v_id"], "v_xid": item["v_xid"], "link": item["link"], "title": item["title"], "duration": format_duration(item["duration"]), "fans": clean_dash_to_zero(item["fans"]), "videos": clean_dash_to_zero(item["videos"]), "watch_number": clean_dash_to_zero(item["view"]), "create_time": format_create_time(item["createtime"]), "cover_pic": item["pic"], "index": item["index"], "u_id": item["u_id"], "u_xid": item["u_xid"], "u_name": item["u_name"], "u_pic": item["u_pic"], "rn": kitem["rn"], "batch": kitem["batch"], "machine_id": MACHINE_ID, "level": kitem["level"], }) DBSA.flush() if rollback[0]: db.rollback_l0(rollback[0]) if rollback[1]: db.rollback_l1(rollback[1]) if rollback[2]: db.rollback_l2(rollback[2]) time.sleep(10) def parse_args() -> argparse.Namespace: global MACHINE_ID, MAX_WORKERS parser = argparse.ArgumentParser( description="Configure worker settings." ) parser.add_argument( "-m", "--machine-id", type=int, help=f"Machine identifier (default: {MACHINE_ID})" ) parser.add_argument( "-w", "--max-workers", type=int, help=f"Maximum concurrent workers (default: {MAX_WORKERS})" ) args = parser.parse_args() if args.machine_id is not None: MACHINE_ID = args.machine_id if args.max_workers is not None: if args.max_workers <= 0: parser.error("--max-workers 不能是 0") MAX_WORKERS = args.max_workers if MACHINE_ID is None: raise ValueError("请指定机器编号") return args if __name__ == '__main__': parse_args() executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) start_time = datetime.now() logger.info(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") integrate_data_parallel() end_time = datetime.now() duration = end_time - start_time