From f9e88e98c9d5e18be66602d3a70bec012328f831 Mon Sep 17 00:00:00 2001 From: Franklin-F Date: Tue, 20 May 2025 23:16:32 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=87=8D=E6=9E=84=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E9=9B=86=E6=88=90=E9=80=BB=E8=BE=91=EF=BC=8C=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=B9=B6=E8=A1=8C=E5=A4=84=E7=90=86=E5=92=8C=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E5=9B=9E=E6=BB=9A=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 127 +++++++++++++++++++++++++++----------------------------- 1 file changed, 62 insertions(+), 65 deletions(-) diff --git a/main.py b/main.py index f9eeac8..4010ac7 100644 --- a/main.py +++ b/main.py @@ -1,4 +1,5 @@ import random +import traceback from urllib.parse import quote import argparse import time @@ -627,86 +628,82 @@ def get_searchInfo(keyword, level, headers): proxiesdict = db.get_proxy_agent_dict() -def search_worker(payload, kitem): +def search_worker(payload, kitem, flag): try: gproxies = proxiesdict[kitem['rn']] - header = gettoken(gproxies) - + header = gettoken(gproxies) v_list = get_searchInfo(kitem['keyword'], kitem['level'], header) if not v_list: for i in range(2): time.sleep(i * 5) - v_list = get_searchInfo(kitem['keyword'], kitem['level']) + v_list = get_searchInfo(kitem['keyword'], kitem['level'], header) if v_list: break time.sleep(2) - return payload, kitem, v_list + return True, flag, payload, kitem, v_list # 成功 except Exception as e: - raise RuntimeError(f"{kitem['keyword']} 处理失败: {e}") from e + print(f"[线程异常] {kitem['keyword']} → {e}") + traceback.print_exc() + return False, flag, payload, kitem, [] # 失败 -def integrate_data(): +executor = concurrent.futures.ThreadPoolExecutor(MAX_WORKERS) +def integrate_data_parallel(): while True: - keywords, flag = db.item_keyword() - if len(keywords) < 1: - time.sleep(30) - else: - for index, (payload, kitem) in enumerate(keywords): - try: - proxiesdict = db.get_proxy_agent_dict() - global Gproxies - Gproxies = proxiesdict[kitem['rn']] - gettoken() - v_list = get_searchInfo(kitem['keyword'], kitem['level']) + tasks, flag = db.item_keyword() + if not tasks: + time.sleep(10) + continue - if not v_list: - for i in range(3): - time.sleep(i * 5) - v_list = get_searchInfo(kitem["keyword"], kitem['level'], headers) - if v_list: - break - time.sleep(2) + futures = [ + executor.submit(search_worker, payload, kitem, flag) + for payload, kitem in tasks + ] - for item in v_list: - record = { - "keyword": kitem.get("keyword"), - "v_name": kitem.get("v_name"), - "v_id": item.get("v_id"), - "v_xid": item.get("v_xid"), - "link": item.get("link"), - "title": item.get("title"), - "duration": format_duration(item.get("duration")), - "fans": clean_dash_to_zero(item.get("fans", 0)), - "videos": clean_dash_to_zero(item.get("videos", 0)), - "watch_number": clean_dash_to_zero(item.get("view", 0)), - "create_time": format_create_time(item.get("createtime")), - "cover_pic": item.get("pic"), - "index": item.get("index", 0), - "u_id": item.get("u_id"), - "u_xid": item.get("u_xid"), - "u_name": item.get("u_name"), - "u_pic": item.get("u_pic"), - "rn": kitem.get("rn"), - "batch": kitem['batch'], - "machine_id": MACHINE_ID, - "level": kitem['level'], - } - db.upsert_video(record) - db.flush() - except Exception as e: - print(f"[异常] {str(e.__class__.__name__)}: {str(e)}") - print(f"[异常] 处理关键词 {kitem['keyword']} 时发生错误,正在回滚...") - time.sleep(5) - remaining_payloads = [p for p, _ in keywords[index:]] - if flag == 0: - db.rollback_l0(remaining_payloads) - elif flag == 1: - db.rollback_l1(remaining_payloads) - elif flag == 2: - db.rollback_l2(remaining_payloads) - time.sleep(5) - break + # 统计回滚 + rollback = {0: [], 1: [], 2: []} + + for fut in concurrent.futures.as_completed(futures): + ok, f_flag, payload, kitem, v_list = fut.result() + + if not ok: + rollback[f_flag].append(payload) + continue + + # —— 写库:可按你原来的 upsert / flush 逻辑 —— + for item in v_list: + record = { + "keyword": kitem["keyword"], + "v_name": kitem["v_name"], + "v_id": item["v_id"], + "v_xid": item["v_xid"], + "link": item["link"], + "title": item["title"], + "duration": format_duration(item["duration"]), + "fans": clean_dash_to_zero(item["fans"]), + "videos": clean_dash_to_zero(item["videos"]), + "watch_number": clean_dash_to_zero(item["view"]), + "create_time": format_create_time(item["createtime"]), + "cover_pic": item["pic"], + "index": item["index"], + "u_id": item["u_id"], + "u_xid": item["u_xid"], + "u_name": item["u_name"], + "u_pic": item["u_pic"], + "rn": kitem["rn"], + "batch": kitem["batch"], + "machine_id": MACHINE_ID, + "level": kitem["level"], + } + db.upsert_video(record) + db.flush() + if rollback[0]: + db.rollback_l0(rollback[0]) + if rollback[1]: + db.rollback_l1(rollback[1]) + if rollback[2]: + db.rollback_l2(rollback[2]) def parse_args() -> argparse.Namespace: @@ -744,6 +741,6 @@ if __name__ == '__main__': parse_args() start_time = datetime.datetime.now() print(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") - integrate_data() + integrate_data_parallel() end_time = datetime.datetime.now() duration = end_time - start_time