feat: 重构数据集成逻辑,支持并行处理和异常回滚机制

This commit is contained in:
晓丰 2025-05-20 23:16:32 +08:00
parent 456be9f8c5
commit f9e88e98c9

113
main.py
View File

@ -1,4 +1,5 @@
import random import random
import traceback
from urllib.parse import quote from urllib.parse import quote
import argparse import argparse
import time import time
@ -627,86 +628,82 @@ def get_searchInfo(keyword, level, headers):
proxiesdict = db.get_proxy_agent_dict() proxiesdict = db.get_proxy_agent_dict()
def search_worker(payload, kitem): def search_worker(payload, kitem, flag):
try: try:
gproxies = proxiesdict[kitem['rn']] gproxies = proxiesdict[kitem['rn']]
header = gettoken(gproxies) header = gettoken(gproxies)
v_list = get_searchInfo(kitem['keyword'], kitem['level'], header) v_list = get_searchInfo(kitem['keyword'], kitem['level'], header)
if not v_list: if not v_list:
for i in range(2): for i in range(2):
time.sleep(i * 5) time.sleep(i * 5)
v_list = get_searchInfo(kitem['keyword'], kitem['level']) v_list = get_searchInfo(kitem['keyword'], kitem['level'], header)
if v_list: if v_list:
break break
time.sleep(2) time.sleep(2)
return payload, kitem, v_list return True, flag, payload, kitem, v_list # 成功
except Exception as e: except Exception as e:
raise RuntimeError(f"{kitem['keyword']} 处理失败: {e}") from e print(f"[线程异常] {kitem['keyword']}{e}")
traceback.print_exc()
return False, flag, payload, kitem, [] # 失败
def integrate_data(): executor = concurrent.futures.ThreadPoolExecutor(MAX_WORKERS)
def integrate_data_parallel():
while True: while True:
keywords, flag = db.item_keyword() tasks, flag = db.item_keyword()
if len(keywords) < 1: if not tasks:
time.sleep(30) time.sleep(10)
else: continue
for index, (payload, kitem) in enumerate(keywords):
try:
proxiesdict = db.get_proxy_agent_dict()
global Gproxies
Gproxies = proxiesdict[kitem['rn']]
gettoken()
v_list = get_searchInfo(kitem['keyword'], kitem['level'])
if not v_list: futures = [
for i in range(3): executor.submit(search_worker, payload, kitem, flag)
time.sleep(i * 5) for payload, kitem in tasks
v_list = get_searchInfo(kitem["keyword"], kitem['level'], headers) ]
if v_list:
break
time.sleep(2)
# 统计回滚
rollback = {0: [], 1: [], 2: []}
for fut in concurrent.futures.as_completed(futures):
ok, f_flag, payload, kitem, v_list = fut.result()
if not ok:
rollback[f_flag].append(payload)
continue
# —— 写库:可按你原来的 upsert / flush 逻辑 ——
for item in v_list: for item in v_list:
record = { record = {
"keyword": kitem.get("keyword"), "keyword": kitem["keyword"],
"v_name": kitem.get("v_name"), "v_name": kitem["v_name"],
"v_id": item.get("v_id"), "v_id": item["v_id"],
"v_xid": item.get("v_xid"), "v_xid": item["v_xid"],
"link": item.get("link"), "link": item["link"],
"title": item.get("title"), "title": item["title"],
"duration": format_duration(item.get("duration")), "duration": format_duration(item["duration"]),
"fans": clean_dash_to_zero(item.get("fans", 0)), "fans": clean_dash_to_zero(item["fans"]),
"videos": clean_dash_to_zero(item.get("videos", 0)), "videos": clean_dash_to_zero(item["videos"]),
"watch_number": clean_dash_to_zero(item.get("view", 0)), "watch_number": clean_dash_to_zero(item["view"]),
"create_time": format_create_time(item.get("createtime")), "create_time": format_create_time(item["createtime"]),
"cover_pic": item.get("pic"), "cover_pic": item["pic"],
"index": item.get("index", 0), "index": item["index"],
"u_id": item.get("u_id"), "u_id": item["u_id"],
"u_xid": item.get("u_xid"), "u_xid": item["u_xid"],
"u_name": item.get("u_name"), "u_name": item["u_name"],
"u_pic": item.get("u_pic"), "u_pic": item["u_pic"],
"rn": kitem.get("rn"), "rn": kitem["rn"],
"batch": kitem['batch'], "batch": kitem["batch"],
"machine_id": MACHINE_ID, "machine_id": MACHINE_ID,
"level": kitem['level'], "level": kitem["level"],
} }
db.upsert_video(record) db.upsert_video(record)
db.flush() db.flush()
except Exception as e: if rollback[0]:
print(f"[异常] {str(e.__class__.__name__)}: {str(e)}") db.rollback_l0(rollback[0])
print(f"[异常] 处理关键词 {kitem['keyword']} 时发生错误,正在回滚...") if rollback[1]:
time.sleep(5) db.rollback_l1(rollback[1])
remaining_payloads = [p for p, _ in keywords[index:]] if rollback[2]:
if flag == 0: db.rollback_l2(rollback[2])
db.rollback_l0(remaining_payloads)
elif flag == 1:
db.rollback_l1(remaining_payloads)
elif flag == 2:
db.rollback_l2(remaining_payloads)
time.sleep(5)
break
def parse_args() -> argparse.Namespace: def parse_args() -> argparse.Namespace:
@ -744,6 +741,6 @@ if __name__ == '__main__':
parse_args() parse_args()
start_time = datetime.datetime.now() start_time = datetime.datetime.now()
print(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}") print(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}")
integrate_data() integrate_data_parallel()
end_time = datetime.datetime.now() end_time = datetime.datetime.now()
duration = end_time - start_time duration = end_time - start_time