DailyMotion/main.py

254 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import traceback
import argparse
import time
import concurrent.futures
import requests
from datetime import datetime
from DB import DBVidcon, DBSA
from dateutil import parser as date_parser
from concurrent.futures import ThreadPoolExecutor
from logger import logger
db = DBVidcon()
MACHINE_ID = None
MAX_WORKERS = 10
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
def get_part_ids(part_num: int, take: int, offset: int = 0):
part_ids = list(range(offset, offset + take))
if max(part_ids) >= part_num:
raise ValueError(f"分片编号超出范围PART_IDS={part_ids} 超过 PART_NUM={part_num}")
next_offset = offset + take
if next_offset < part_num:
print(f"[提示] 下一台机器 offset 应该为: {next_offset}")
else:
print(f"[提示] 当前分片已经覆盖至末尾,无需更多机器")
return part_ids
def clean_dash_to_zero(val):
if val in ('-', '', None):
return 0
try:
return int(val)
except (ValueError, TypeError) as e:
logger.exception(f"[字段异常] val = {val}{str(e)}")
return 0
def format_create_time(timestr):
try:
dt = date_parser.isoparse(timestr)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
logger.exception(f"[时间格式错误] {timestr}{str(e)}")
return "1970-01-01 00:00:00"
def format_duration(seconds):
try:
seconds = int(seconds)
return f"{seconds // 60:02}:{seconds % 60:02}"
except Exception:
return "00:00"
def get_searchInfo(keyword, level, headers, proxy_name, r=2):
if r == 2:
logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}")
video_list = []
max_page = 2
limit = 30
endpoint = 'https://api.dailymotion.com/videos'
if level == 0 or level == 1:
max_page = 3
limit = 100
for j in range(1, max_page):
params = {
'search': keyword,
'fields': 'id,title,created_time,thumbnail_240_url,duration,owner.id,owner.screenname,likes_total,views_total,owner.avatar_60_url,owner.followers_total,owner.videos_total',
'limit': limit,
'page': j,
'sort': "relevance"
}
proxy_string = db.get_proxy(proxy_name)
logger.info(f"代理: {proxy_string}")
proxies = {
'http': proxy_string,
'https': proxy_string,
}
try:
response = requests.get(endpoint, params=params, proxies=proxies)
jsondata = response.json()
except Exception as e:
if r < 0:
logger.exception(f"[Requested] 未知:{e}, keyword: {keyword}, l: {level}")
else:
time.sleep((3 - r) * 5)
return get_searchInfo(keyword, level, headers, proxy_name, r - 1)
try:
resinfo = jsondata.get("list")
except Exception:
if r < 0:
logger.exception("[搜索接口] 未知:未处理", response.text)
logger.exception("返回字段解析错误!")
return None
else:
time.sleep((3 - r) * 5)
return get_searchInfo(keyword, level, headers, proxy_name, r - 1)
for index, iteminfo in enumerate(resinfo):
calculated_index = index + 1 + (j - 1) * limit
xid = iteminfo["id"]
vid = base64.b64encode(f"Video:{xid}".encode('utf-8')).decode('utf-8')
uxid = iteminfo["owner.id"]
uid = base64.b64encode(f"Channel:{uxid}".encode('utf-8')).decode('utf-8')
duration = iteminfo.get('duration')
if duration <= 300:
continue
v_data = {
"index": calculated_index,
"v_id": vid,
"v_xid": xid,
"link": "https://www.dailymotion.com/video/" + xid,
"title": iteminfo.get("title"),
"createtime": datetime.fromtimestamp(iteminfo.get("created_time")).strftime("%Y-%m-%d %H:%M:%S"),
"duration": iteminfo.get('duration'),
"pic": iteminfo.get('thumbnail_240_url'),
"view": iteminfo.get('views_total'),
"fans": iteminfo.get('owner.followers_total'),
"videos": iteminfo.get('owner.videos_total'),
"u_id": uid,
"u_xid": uxid,
"u_name": iteminfo.get('owner.screenname'),
"u_pic": iteminfo.get('owner.avatar_60_url')
}
video_list.append(v_data)
time.sleep(2)
if len(video_list) < 100:
break
return video_list
proxiesdict = db.get_proxy_agent_dict()
def search_worker(payload, kitem, flag):
try:
gproxies = proxiesdict[kitem['rn']]
v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies)
if not v_list:
for i in range(2):
time.sleep(i * 5)
v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies)
if v_list:
break
time.sleep(2)
if not v_list:
v_list = []
return True, flag, payload, kitem, v_list # 成功
except Exception as e:
logger.exception(f"[线程异常] {kitem['keyword']}{e}")
traceback.print_exc()
return False, flag, payload, kitem, [] # 失败
def integrate_data_parallel():
while True:
global proxiesdict
proxiesdict = db.get_proxy_agent_dict()
tasks, flag = db.item_keyword(20)
if not tasks:
time.sleep(10)
continue
futures = []
for payload, kitem in tasks:
futures.append(executor.submit(search_worker, payload, kitem, flag))
time.sleep(3)
rollback = {0: [], 1: [], 2: []}
for fut in concurrent.futures.as_completed(futures):
ok, f_flag, payload, kitem, v_list = fut.result()
if not ok:
rollback[f_flag].append(payload)
continue
for item in v_list:
if not v_list:
continue
DBSA.upsert_video({
"keyword": kitem["keyword"],
"v_name": kitem["v_name"],
"v_id": item["v_id"],
"v_xid": item["v_xid"],
"link": item["link"],
"title": item["title"],
"duration": format_duration(item["duration"]),
"fans": clean_dash_to_zero(item["fans"]),
"videos": clean_dash_to_zero(item["videos"]),
"watch_number": clean_dash_to_zero(item["view"]),
"create_time": format_create_time(item["createtime"]),
"cover_pic": item["pic"],
"index": item["index"],
"u_id": item["u_id"],
"u_xid": item["u_xid"],
"u_name": item["u_name"],
"u_pic": item["u_pic"],
"rn": kitem["rn"],
"batch": kitem["batch"],
"machine_id": MACHINE_ID,
"level": kitem["level"],
})
DBSA.flush()
if rollback[0]:
db.rollback_l0(rollback[0])
if rollback[1]:
db.rollback_l1(rollback[1])
if rollback[2]:
db.rollback_l2(rollback[2])
def parse_args() -> argparse.Namespace:
global MACHINE_ID, MAX_WORKERS
parser = argparse.ArgumentParser(
description="Configure worker settings."
)
parser.add_argument(
"-m", "--machine-id",
type=int,
help=f"Machine identifier (default: {MACHINE_ID})"
)
parser.add_argument(
"-w", "--max-workers",
type=int,
help=f"Maximum concurrent workers (default: {MAX_WORKERS})"
)
args = parser.parse_args()
if args.machine_id is not None:
MACHINE_ID = args.machine_id
if args.max_workers is not None:
if args.max_workers <= 0:
parser.error("--max-workers 不能是 0")
MAX_WORKERS = args.max_workers
if MACHINE_ID is None:
raise ValueError("请指定机器编号")
return args
if __name__ == '__main__':
parse_args()
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
start_time = datetime.now()
logger.info(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}")
integrate_data_parallel()
end_time = datetime.now()
duration = end_time - start_time