254 lines
8.5 KiB
Python
254 lines
8.5 KiB
Python
import base64
|
||
import traceback
|
||
import argparse
|
||
import time
|
||
import concurrent.futures
|
||
import requests
|
||
from datetime import datetime
|
||
from DB import DBVidcon, DBSA
|
||
from dateutil import parser as date_parser
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from logger import logger
|
||
|
||
db = DBVidcon()
|
||
MACHINE_ID = None
|
||
MAX_WORKERS = 10
|
||
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
|
||
|
||
|
||
def get_part_ids(part_num: int, take: int, offset: int = 0):
|
||
part_ids = list(range(offset, offset + take))
|
||
if max(part_ids) >= part_num:
|
||
raise ValueError(f"分片编号超出范围,PART_IDS={part_ids} 超过 PART_NUM={part_num}")
|
||
next_offset = offset + take
|
||
if next_offset < part_num:
|
||
print(f"[提示] 下一台机器 offset 应该为: {next_offset}")
|
||
else:
|
||
print(f"[提示] 当前分片已经覆盖至末尾,无需更多机器")
|
||
return part_ids
|
||
|
||
|
||
def clean_dash_to_zero(val):
|
||
if val in ('-', '', None):
|
||
return 0
|
||
try:
|
||
return int(val)
|
||
except (ValueError, TypeError) as e:
|
||
logger.exception(f"[字段异常] val = {val} → {str(e)}")
|
||
return 0
|
||
|
||
|
||
def format_create_time(timestr):
|
||
try:
|
||
dt = date_parser.isoparse(timestr)
|
||
return dt.strftime("%Y-%m-%d %H:%M:%S")
|
||
except Exception as e:
|
||
logger.exception(f"[时间格式错误] {timestr} → {str(e)}")
|
||
return "1970-01-01 00:00:00"
|
||
|
||
|
||
def format_duration(seconds):
|
||
try:
|
||
seconds = int(seconds)
|
||
return f"{seconds // 60:02}:{seconds % 60:02}"
|
||
except Exception:
|
||
return "00:00"
|
||
|
||
|
||
def get_searchInfo(keyword, level, headers, proxy_name, r=2):
|
||
if r == 2:
|
||
logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}")
|
||
video_list = []
|
||
max_page = 2
|
||
limit = 30
|
||
endpoint = 'https://api.dailymotion.com/videos'
|
||
if level == 0 or level == 1:
|
||
max_page = 3
|
||
limit = 100
|
||
for j in range(1, max_page):
|
||
params = {
|
||
'search': keyword,
|
||
'fields': 'id,title,created_time,thumbnail_240_url,duration,owner.id,owner.screenname,likes_total,views_total,owner.avatar_60_url,owner.followers_total,owner.videos_total',
|
||
'limit': limit,
|
||
'page': j,
|
||
'sort': "relevance"
|
||
}
|
||
proxy_string = db.get_proxy(proxy_name)
|
||
logger.info(f"代理: {proxy_string}")
|
||
proxies = {
|
||
'http': proxy_string,
|
||
'https': proxy_string,
|
||
}
|
||
try:
|
||
response = requests.get(endpoint, params=params, proxies=proxies)
|
||
jsondata = response.json()
|
||
except Exception as e:
|
||
if r < 0:
|
||
logger.exception(f"[Requested] 未知:{e}, keyword: {keyword}, l: {level}")
|
||
else:
|
||
time.sleep((3 - r) * 5)
|
||
return get_searchInfo(keyword, level, headers, proxy_name, r - 1)
|
||
try:
|
||
resinfo = jsondata.get("list")
|
||
except Exception:
|
||
if r < 0:
|
||
logger.exception("[搜索接口] 未知:未处理", response.text)
|
||
logger.exception("返回字段解析错误!")
|
||
return None
|
||
else:
|
||
time.sleep((3 - r) * 5)
|
||
return get_searchInfo(keyword, level, headers, proxy_name, r - 1)
|
||
for index, iteminfo in enumerate(resinfo):
|
||
calculated_index = index + 1 + (j - 1) * limit
|
||
xid = iteminfo["id"]
|
||
vid = base64.b64encode(f"Video:{xid}".encode('utf-8')).decode('utf-8')
|
||
uxid = iteminfo["owner.id"]
|
||
uid = base64.b64encode(f"Channel:{uxid}".encode('utf-8')).decode('utf-8')
|
||
duration = iteminfo.get('duration')
|
||
if duration <= 300:
|
||
continue
|
||
v_data = {
|
||
"index": calculated_index,
|
||
"v_id": vid,
|
||
"v_xid": xid,
|
||
"link": "https://www.dailymotion.com/video/" + xid,
|
||
"title": iteminfo.get("title"),
|
||
"createtime": datetime.fromtimestamp(iteminfo.get("created_time")).strftime("%Y-%m-%d %H:%M:%S"),
|
||
"duration": iteminfo.get('duration'),
|
||
"pic": iteminfo.get('thumbnail_240_url'),
|
||
"view": iteminfo.get('views_total'),
|
||
"fans": iteminfo.get('owner.followers_total'),
|
||
"videos": iteminfo.get('owner.videos_total'),
|
||
"u_id": uid,
|
||
"u_xid": uxid,
|
||
"u_name": iteminfo.get('owner.screenname'),
|
||
"u_pic": iteminfo.get('owner.avatar_60_url')
|
||
}
|
||
video_list.append(v_data)
|
||
time.sleep(3)
|
||
if len(video_list) < 100:
|
||
break
|
||
return video_list
|
||
|
||
|
||
proxiesdict = db.get_proxy_agent_dict()
|
||
|
||
|
||
def search_worker(payload, kitem, flag):
|
||
try:
|
||
gproxies = proxiesdict[kitem['rn']]
|
||
v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies)
|
||
if not v_list:
|
||
for i in range(2):
|
||
time.sleep(i * 5)
|
||
v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies)
|
||
if v_list:
|
||
break
|
||
time.sleep(2)
|
||
if not v_list:
|
||
v_list = []
|
||
return True, flag, payload, kitem, v_list # 成功
|
||
except Exception as e:
|
||
logger.exception(f"[线程异常] {kitem['keyword']} → {e}")
|
||
traceback.print_exc()
|
||
return False, flag, payload, kitem, [] # 失败
|
||
|
||
|
||
def integrate_data_parallel():
|
||
while True:
|
||
global proxiesdict
|
||
proxiesdict = db.get_proxy_agent_dict()
|
||
tasks, flag = db.item_keyword(20)
|
||
if not tasks:
|
||
time.sleep(10)
|
||
continue
|
||
|
||
futures = []
|
||
for payload, kitem in tasks:
|
||
futures.append(executor.submit(search_worker, payload, kitem, flag))
|
||
time.sleep(3)
|
||
|
||
rollback = {0: [], 1: [], 2: []}
|
||
|
||
for fut in concurrent.futures.as_completed(futures):
|
||
ok, f_flag, payload, kitem, v_list = fut.result()
|
||
|
||
if not ok:
|
||
rollback[f_flag].append(payload)
|
||
continue
|
||
|
||
for item in v_list:
|
||
if not v_list:
|
||
continue
|
||
DBSA.upsert_video({
|
||
"keyword": kitem["keyword"],
|
||
"v_name": kitem["v_name"],
|
||
"v_id": item["v_id"],
|
||
"v_xid": item["v_xid"],
|
||
"link": item["link"],
|
||
"title": item["title"],
|
||
"duration": format_duration(item["duration"]),
|
||
"fans": clean_dash_to_zero(item["fans"]),
|
||
"videos": clean_dash_to_zero(item["videos"]),
|
||
"watch_number": clean_dash_to_zero(item["view"]),
|
||
"create_time": format_create_time(item["createtime"]),
|
||
"cover_pic": item["pic"],
|
||
"index": item["index"],
|
||
"u_id": item["u_id"],
|
||
"u_xid": item["u_xid"],
|
||
"u_name": item["u_name"],
|
||
"u_pic": item["u_pic"],
|
||
"rn": kitem["rn"],
|
||
"batch": kitem["batch"],
|
||
"machine_id": MACHINE_ID,
|
||
"level": kitem["level"],
|
||
})
|
||
DBSA.flush()
|
||
if rollback[0]:
|
||
db.rollback_l0(rollback[0])
|
||
if rollback[1]:
|
||
db.rollback_l1(rollback[1])
|
||
if rollback[2]:
|
||
db.rollback_l2(rollback[2])
|
||
|
||
|
||
def parse_args() -> argparse.Namespace:
|
||
global MACHINE_ID, MAX_WORKERS
|
||
|
||
parser = argparse.ArgumentParser(
|
||
description="Configure worker settings."
|
||
)
|
||
parser.add_argument(
|
||
"-m", "--machine-id",
|
||
type=int,
|
||
help=f"Machine identifier (default: {MACHINE_ID})"
|
||
)
|
||
parser.add_argument(
|
||
"-w", "--max-workers",
|
||
type=int,
|
||
help=f"Maximum concurrent workers (default: {MAX_WORKERS})"
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.machine_id is not None:
|
||
MACHINE_ID = args.machine_id
|
||
|
||
if args.max_workers is not None:
|
||
if args.max_workers <= 0:
|
||
parser.error("--max-workers 不能是 0")
|
||
MAX_WORKERS = args.max_workers
|
||
if MACHINE_ID is None:
|
||
raise ValueError("请指定机器编号")
|
||
return args
|
||
|
||
|
||
if __name__ == '__main__':
|
||
parse_args()
|
||
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
|
||
start_time = datetime.now()
|
||
logger.info(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
integrate_data_parallel()
|
||
end_time = datetime.now()
|
||
duration = end_time - start_time
|