diff --git a/main.py b/main.py index 6196593..f9eeac8 100644 --- a/main.py +++ b/main.py @@ -9,8 +9,9 @@ import datetime from requests import RequestException from DB import DBVidcon from dateutil import parser as date_parser +import copy +from threading import Lock -batch = str(int(time.time())) db = DBVidcon() MACHINE_ID = None MAX_WORKERS = 10 @@ -82,7 +83,8 @@ headers1 = { 'x-dm-visit-id': '1745971699160', 'x-dm-visitor-id': '64633D03065F4186AD0B027CF795EF0F', } - +_headers_cache = None # 保存最近一次成功的 headers +_cache_lock = Lock() Gproxies = None @@ -120,12 +122,12 @@ def get_proxies(g): return proxies -def post_with_retry(url, json_payload=None, data=None, headers=None, proxies=None, +def post_with_retry(url, proxy_name, json_payload=None, data=None, headers=None, retries=5, timeout=10, backoff_factor=2, verbose=True): token_refreshed = False for attempt in range(1, retries + 1): try: - proxy_str = db.get_proxy(Gproxies) + proxy_str = db.get_proxy(proxy_name) proxies = {"http": proxy_str, "https": proxy_str} @@ -154,7 +156,7 @@ def post_with_retry(url, json_payload=None, data=None, headers=None, proxies=Non if not token_refreshed: if verbose: print("[post_with_retry] 刷新 token 后再试") - gettoken() + gettoken(proxy_name) token_refreshed = True continue if attempt == retries: @@ -168,7 +170,8 @@ def post_with_retry(url, json_payload=None, data=None, headers=None, proxies=Non time.sleep(sleep_time) -def gettoken(): +def gettoken(proxy, r=2): + global _headers_cache headers = { 'Accept': '*/*', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', @@ -186,7 +189,6 @@ def gettoken(): 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', } - u = uuid.uuid4() uuid_with_dash = str(u) uuid_no_dash = u.hex @@ -199,20 +201,32 @@ def gettoken(): 'visitor_id': uuid_with_dash, } try: - # proxy_str = db.get_proxy(Gproxies) - proxy_str = db.get_proxy(Gproxies) + proxy_str = db.get_proxy(proxy) url = 'https://graphql.api.dailymotion.com/oauth/token' response = requests.post(url, headers=headers, data=data, proxies={"http": proxy_str, "https": proxy_str}) token = response.json()['access_token'] - headers1['authorization'] = "Bearer " + token - headers1['x-dm-visit-id'] = str(int(time.time() * 1000)) - headers1['x-dm-visitor-id'] = uuid_no_dash + copy_headers = copy.deepcopy(headers1) + copy_headers['authorization'] = "Bearer " + token + copy_headers['x-dm-visit-id'] = str(int(time.time() * 1000)) + copy_headers['x-dm-visitor-id'] = uuid_no_dash + with _cache_lock: + _headers_cache = copy_headers + return copy_headers except Exception as e: - print(str(e)) - pass + print("[gettoken] 失败:", e) + if r > 0: + time.sleep(5) + return gettoken(proxy, r - 1) + else: + with _cache_lock: + if _headers_cache: + print("[gettoken] 用缓存 headers 兜底") + return copy.deepcopy(_headers_cache) + # 仍然没有 → 返回模板(没有 Auth) + return copy.deepcopy(headers1) -def get_searchInfo(keyword, level): +def get_searchInfo(keyword, level, headers): video_list = [] max_page = 2 limit = 30 @@ -568,7 +582,7 @@ def get_searchInfo(keyword, level): response = post_with_retry( "https://graphql.api.dailymotion.com/", json_payload=data, - headers=headers1, + headers=headers, proxies=None ) @@ -610,6 +624,28 @@ def get_searchInfo(keyword, level): return video_list +proxiesdict = db.get_proxy_agent_dict() + + +def search_worker(payload, kitem): + try: + gproxies = proxiesdict[kitem['rn']] + header = gettoken(gproxies) + + v_list = get_searchInfo(kitem['keyword'], kitem['level'], header) + if not v_list: + for i in range(2): + time.sleep(i * 5) + v_list = get_searchInfo(kitem['keyword'], kitem['level']) + if v_list: + break + time.sleep(2) + + return payload, kitem, v_list + except Exception as e: + raise RuntimeError(f"{kitem['keyword']} 处理失败: {e}") from e + + def integrate_data(): while True: keywords, flag = db.item_keyword() @@ -627,7 +663,7 @@ def integrate_data(): if not v_list: for i in range(3): time.sleep(i * 5) - v_list = get_searchInfo(kitem["keyword"], kitem['level']) + v_list = get_searchInfo(kitem["keyword"], kitem['level'], headers) if v_list: break time.sleep(2) @@ -663,10 +699,12 @@ def integrate_data(): print(f"[异常] 处理关键词 {kitem['keyword']} 时发生错误,正在回滚...") time.sleep(5) remaining_payloads = [p for p, _ in keywords[index:]] - if flag == 2: - db.rollback(remaining_payloads) + if flag == 0: + db.rollback_l0(remaining_payloads) elif flag == 1: - db.rollback_records(remaining_payloads) + db.rollback_l1(remaining_payloads) + elif flag == 2: + db.rollback_l2(remaining_payloads) time.sleep(5) break