diff --git a/main.py b/main.py index 61fdd33..e886ddf 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ +import json import random import traceback from urllib.parse import quote @@ -227,7 +228,7 @@ def gettoken(proxy, r=2): return copy.deepcopy(headers1) -def get_searchInfo(keyword, level, headers, proxy_name): +def get_searchInfo(keyword, level, headers, proxy_name, r=2): video_list = [] max_page = 2 limit = 30 @@ -589,12 +590,25 @@ def get_searchInfo(keyword, level, headers, proxy_name): jsondata = response.json() try: - resinfo = jsondata['data']['search']['stories']['edges'] - print('resinfo :', len(resinfo)) + errors = jsondata.get("errors") # GraphQL errors 数组 + stories = jsondata.get("data", {}).get("search", {}).get("stories") + + if errors or stories is None: # 有错误 或 stories 为 null + if r == 0: + print("连续 3 次错误或空结果:", json.dumps(jsondata, ensure_ascii=False)) + return None + time.sleep((3 - r) * 5) + return get_searchInfo(keyword, level, headers, proxy_name, r - 1) + resinfo = stories["edges"] + print("resinfo :", len(resinfo)) except Exception: - resinfo = [] - print("[搜索接口]", response.text) - print("返回字段解析错误!") + if r < 0: + print("[搜索接口] 未知:未处理", response.text) + print("返回字段解析错误!") + return None + else: + time.sleep((3 - r) * 5) + return get_searchInfo(keyword, level, headers, proxy_name, r - 1) for index, iteminfo in enumerate(resinfo): calculated_index = index + 1 + (j - 1) * 100 node = iteminfo['node'] @@ -631,24 +645,26 @@ proxiesdict = db.get_proxy_agent_dict() def search_worker(payload, kitem, flag): try: gproxies = proxiesdict[kitem['rn']] - header = gettoken(gproxies) + header = gettoken(gproxies) v_list = get_searchInfo(kitem['keyword'], kitem['level'], header, gproxies) if not v_list: for i in range(2): time.sleep(i * 5) - v_list = get_searchInfo(kitem['keyword'], kitem['level'], header) + v_list = get_searchInfo(kitem['keyword'], kitem['level'], header, gproxies) if v_list: break time.sleep(2) - return True, flag, payload, kitem, v_list # 成功 + return True, flag, payload, kitem, v_list # 成功 except Exception as e: print(f"[线程异常] {kitem['keyword']} → {e}") traceback.print_exc() - return False, flag, payload, kitem, [] # 失败 + return False, flag, payload, kitem, [] # 失败 executor = concurrent.futures.ThreadPoolExecutor(MAX_WORKERS) + + def integrate_data_parallel(): while True: tasks, flag = db.item_keyword() @@ -675,26 +691,26 @@ def integrate_data_parallel(): for item in v_list: record = { "keyword": kitem["keyword"], - "v_name": kitem["v_name"], - "v_id": item["v_id"], - "v_xid": item["v_xid"], - "link": item["link"], - "title": item["title"], - "duration": format_duration(item["duration"]), - "fans": clean_dash_to_zero(item["fans"]), - "videos": clean_dash_to_zero(item["videos"]), - "watch_number": clean_dash_to_zero(item["view"]), - "create_time": format_create_time(item["createtime"]), - "cover_pic": item["pic"], - "index": item["index"], - "u_id": item["u_id"], - "u_xid": item["u_xid"], + "v_name": kitem["v_name"], + "v_id": item["v_id"], + "v_xid": item["v_xid"], + "link": item["link"], + "title": item["title"], + "duration": format_duration(item["duration"]), + "fans": clean_dash_to_zero(item["fans"]), + "videos": clean_dash_to_zero(item["videos"]), + "watch_number": clean_dash_to_zero(item["view"]), + "create_time": format_create_time(item["createtime"]), + "cover_pic": item["pic"], + "index": item["index"], + "u_id": item["u_id"], + "u_xid": item["u_xid"], "u_name": item["u_name"], - "u_pic": item["u_pic"], - "rn": kitem["rn"], - "batch": kitem["batch"], + "u_pic": item["u_pic"], + "rn": kitem["rn"], + "batch": kitem["batch"], "machine_id": MACHINE_ID, - "level": kitem["level"], + "level": kitem["level"], } db.upsert_video(record) db.flush()