DailyMotion/main.py

750 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
from urllib.parse import quote
import argparse
import time
import uuid
import concurrent.futures
import requests
import datetime
from requests import RequestException
from DB import DBVidcon
from dateutil import parser as date_parser
import copy
from threading import Lock
db = DBVidcon()
MACHINE_ID = None
MAX_WORKERS = 10
def get_part_ids(part_num: int, take: int, offset: int = 0):
part_ids = list(range(offset, offset + take))
if max(part_ids) >= part_num:
raise ValueError(f"分片编号超出范围PART_IDS={part_ids} 超过 PART_NUM={part_num}")
next_offset = offset + take
if next_offset < part_num:
print(f"[提示] 下一台机器 offset 应该为: {next_offset}")
else:
print(f"[提示] 当前分片已经覆盖至末尾,无需更多机器")
return part_ids
def clean_dash_to_zero(val):
if val in ('-', '', None):
return 0
try:
return int(val)
except (ValueError, TypeError) as e:
print(f"[字段异常] val = {val}{str(e)}")
return 0
def format_create_time(timestr):
try:
dt = date_parser.isoparse(timestr)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
print(f"[时间格式错误] {timestr}{str(e)}")
return "1970-01-01 00:00:00"
def format_duration(seconds):
try:
seconds = int(seconds)
return f"{seconds // 60:02}:{seconds % 60:02}"
except Exception:
return "00:00"
headers1 = {
'Accept': '*/*, */*',
# 'Accept-Encoding': 'gzip, deflate, br',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
# 'Content-Length': '6237',
'Content-Type': 'application/json, application/json',
'Host': 'graphql.api.dailymotion.com',
'Origin': 'https://www.dailymotion.com',
'Referer': 'https://www.dailymotion.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
'X-DM-AppInfo-Id': 'com.dailymotion.neon',
'X-DM-AppInfo-Type': 'website',
'X-DM-AppInfo-Version': 'v2025-04-28T12:37:52.391Z',
'X-DM-Neon-SSR': '0',
'X-DM-Preferred-Country': 'us',
'accept-language': 'zh-CN',
'authorization': 'Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhaWQiOiJmMWEzNjJkMjg4YzFiOTgwOTljNyIsInJvbCI6ImNhbi1tYW5hZ2UtcGFydG5lcnMtcmVwb3J0cyBjYW4tcmVhZC12aWRlby1zdHJlYW1zIGNhbi1zcG9vZi1jb3VudHJ5IGNhbi1hZG9wdC11c2VycyBjYW4tcmVhZC1jbGFpbS1ydWxlcyBjYW4tbWFuYWdlLWNsYWltLXJ1bGVzIGNhbi1tYW5hZ2UtdXNlci1hbmFseXRpY3MgY2FuLXJlYWQtbXktdmlkZW8tc3RyZWFtcyBjYW4tZG93bmxvYWQtbXktdmlkZW9zIGFjdC1hcyBhbGxzY29wZXMgYWNjb3VudC1jcmVhdG9yIGNhbi1yZWFkLWFwcGxpY2F0aW9ucyIsInNjbyI6InJlYWQgd3JpdGUgZGVsZXRlIGVtYWlsIHVzZXJpbmZvIGZlZWQgbWFuYWdlX3ZpZGVvcyBtYW5hZ2VfY29tbWVudHMgbWFuYWdlX3BsYXlsaXN0cyBtYW5hZ2VfdGlsZXMgbWFuYWdlX3N1YnNjcmlwdGlvbnMgbWFuYWdlX2ZyaWVuZHMgbWFuYWdlX2Zhdm9yaXRlcyBtYW5hZ2VfbGlrZXMgbWFuYWdlX2dyb3VwcyBtYW5hZ2VfcmVjb3JkcyBtYW5hZ2Vfc3VidGl0bGVzIG1hbmFnZV9mZWF0dXJlcyBtYW5hZ2VfaGlzdG9yeSBpZnR0dCByZWFkX2luc2lnaHRzIG1hbmFnZV9jbGFpbV9ydWxlcyBkZWxlZ2F0ZV9hY2NvdW50X21hbmFnZW1lbnQgbWFuYWdlX2FuYWx5dGljcyBtYW5hZ2VfcGxheWVyIG1hbmFnZV9wbGF5ZXJzIG1hbmFnZV91c2VyX3NldHRpbmdzIG1hbmFnZV9jb2xsZWN0aW9ucyBtYW5hZ2VfYXBwX2Nvbm5lY3Rpb25zIG1hbmFnZV9hcHBsaWNhdGlvbnMgbWFuYWdlX2RvbWFpbnMgbWFuYWdlX3BvZGNhc3RzIiwibHRvIjoiZVdGV1JTSkdXRVZjVGg0eEYyRWpWblFlTHdrdUhTVjVPMGdrWGciLCJhaW4iOjEsImFkZyI6MSwiaWF0IjoxNzQ2MjU3NzI1LCJleHAiOjE3NDYyOTM1NjgsImRtdiI6IjEiLCJhdHAiOiJicm93c2VyIiwiYWRhIjoid3d3LmRhaWx5bW90aW9uLmNvbSIsInZpZCI6IjY0NjMzRDAzMDY1RjQxODZBRDBCMDI3Q0Y3OTVFRjBGIiwiZnRzIjo5MTE0MSwiY2FkIjoyLCJjeHAiOjIsImNhdSI6Miwia2lkIjoiQUY4NDlERDczQTU4NjNDRDdEOTdEMEJBQjA3MjI0M0IifQ.bMzShOLIb6datC92qGPTRVCW9eINTYDFwLtqed2P1d4',
'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'x-dm-visit-id': '1745971699160',
'x-dm-visitor-id': '64633D03065F4186AD0B027CF795EF0F',
}
_headers_cache = None # 保存最近一次成功的 headers
_cache_lock = Lock()
Gproxies = None
def get_proxies(g):
url = "https://www.kookeey.com/pickdynamicips"
params = {
"auth": "pwd",
"format": "1",
"n": "1",
"p": "http",
"gate": "sea",
"g": g,
"r": "0",
"type": "json",
"sign": "10099426b05c7119e9c4dbd6a7a0aa4e",
"accessid": "2207189",
"dl": ","
}
try:
response = requests.get(url, params=params)
except RequestException:
return get_proxies(g)
try:
proxy_data = response.json()['data'][0]
except Exception:
print(g)
print("数据返回解析错误!" + str(response.text))
time.sleep(5)
return get_proxies(g)
proxies_url = f"http://{proxy_data['username']}:{proxy_data['password']}@{proxy_data['ip']}:{proxy_data['port']}"
proxies = {
"http": proxies_url,
"https": proxies_url,
}
return proxies
def post_with_retry(url, proxy_name, json_payload=None, data=None, headers=None,
retries=5, timeout=10, backoff_factor=2, verbose=True):
token_refreshed = False
for attempt in range(1, retries + 1):
try:
proxy_str = db.get_proxy(proxy_name)
proxies = {"http": proxy_str, "https": proxy_str}
resp = requests.post(
url,
json=json_payload,
data=data,
headers=headers,
proxies=proxies,
timeout=timeout
)
if resp.status_code == 401 and not token_refreshed:
if verbose:
print("[post_with_retry] 收到 401刷新 token 后重试")
gettoken()
token_refreshed = True
continue
resp.raise_for_status()
return resp
except RequestException as e:
if verbose:
print(f"[{attempt}/{retries}] 请求失败: {e}")
# 如果还没刷新过 token就刷新一次
if not token_refreshed:
if verbose:
print("[post_with_retry] 刷新 token 后再试")
gettoken(proxy_name)
token_refreshed = True
continue
if attempt == retries:
if verbose:
print(f"[post_with_retry] 最终失败:{url}")
return None
sleep_time = backoff_factor * (2 ** (attempt - 1))
if verbose:
print(f"[post_with_retry] 等待 {sleep_time}s 后重试…")
time.sleep(sleep_time)
def gettoken(proxy, r=2):
global _headers_cache
headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded',
'Origin': 'https://www.dailymotion.com',
'Pragma': 'no-cache',
'Referer': 'https://www.dailymotion.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36',
'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
u = uuid.uuid4()
uuid_with_dash = str(u)
uuid_no_dash = u.hex
traffic_segment = str(random.randint(100_000, 999_999))
data = {
'client_id': 'f1a362d288c1b98099c7',
'client_secret': 'eea605b96e01c796ff369935357eca920c5da4c5',
'grant_type': 'client_credentials',
'traffic_segment': traffic_segment,
'visitor_id': uuid_with_dash,
}
try:
proxy_str = db.get_proxy(proxy)
url = 'https://graphql.api.dailymotion.com/oauth/token'
response = requests.post(url, headers=headers, data=data, proxies={"http": proxy_str, "https": proxy_str})
token = response.json()['access_token']
copy_headers = copy.deepcopy(headers1)
copy_headers['authorization'] = "Bearer " + token
copy_headers['x-dm-visit-id'] = str(int(time.time() * 1000))
copy_headers['x-dm-visitor-id'] = uuid_no_dash
with _cache_lock:
_headers_cache = copy_headers
return copy_headers
except Exception as e:
print("[gettoken] 失败:", e)
if r > 0:
time.sleep(5)
return gettoken(proxy, r - 1)
else:
with _cache_lock:
if _headers_cache:
print("[gettoken] 用缓存 headers 兜底")
return copy.deepcopy(_headers_cache)
# 仍然没有 → 返回模板(没有 Auth
return copy.deepcopy(headers1)
def get_searchInfo(keyword, level, headers):
video_list = []
max_page = 2
limit = 30
if level == 0 or level == 1:
max_page = 3
limit = 100
for j in range(1, max_page):
# 别展开 = = !
data = {
"operationName": "SEARCH_QUERY",
"variables": {
"query": keyword,
"shouldIncludeTopResults": True,
"shouldIncludeChannels": False,
"shouldIncludePlaylists": False,
"shouldIncludeHashtags": False,
"shouldIncludeVideos": False,
"shouldIncludeLives": False,
"page": j,
"limit": limit,
"recaptchaToken": None
},
"query": """
fragment VIDEO_BASE_FRAGMENT on Video {
id
xid
title
createdAt
duration
aspectRatio
thumbnail(height: PORTRAIT_240) {
id
url
__typename
}
creator {
id
xid
name
displayName
accountType
avatar(height: SQUARE_60) {
id
url
__typename
}
__typename
}
__typename
}
fragment CHANNEL_BASE_FRAG on Channel {
id
xid
name
displayName
accountType
isFollowed
avatar(height: SQUARE_120) {
id
url
__typename
}
followerEngagement {
id
followDate
__typename
}
metrics {
id
engagement {
id
followers {
edges {
node {
id
total
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
fragment PLAYLIST_BASE_FRAG on Collection {
id
xid
name
description
thumbnail(height: PORTRAIT_240) {
id
url
__typename
}
creator {
id
xid
name
displayName
accountType
avatar(height: SQUARE_60) {
id
url
__typename
}
__typename
}
metrics {
id
engagement {
id
videos(filter: {visibility: {eq: PUBLIC}}) {
edges {
node {
id
total
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
fragment HASHTAG_BASE_FRAG on Hashtag {
id
xid
name
metrics {
id
engagement {
id
videos {
edges {
node {
id
total
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
__typename
}
fragment LIVE_BASE_FRAGMENT on Live {
id
xid
title
audienceCount
aspectRatio
isOnAir
thumbnail(height: PORTRAIT_240) {
id
url
__typename
}
creator {
id
xid
name
displayName
accountType
avatar(height: SQUARE_60) {
id
url
__typename
}
__typename
}
__typename
}
query SEARCH_QUERY($query: String!, $shouldIncludeTopResults: Boolean!, $shouldIncludeVideos: Boolean!, $shouldIncludeChannels: Boolean!, $shouldIncludePlaylists: Boolean!, $shouldIncludeHashtags: Boolean!, $shouldIncludeLives: Boolean!, $page: Int, $limit: Int, $sortByVideos: SearchVideoSort, $durationMinVideos: Int, $durationMaxVideos: Int, $createdAfterVideos: DateTime, $recaptchaToken: String) {
search(token: $recaptchaToken) {
id
stories(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeTopResults) {
metadata {
id
algorithm {
uuid
__typename
}
__typename
}
pageInfo {
hasNextPage
nextPage
__typename
}
edges {
node {
...VIDEO_BASE_FRAGMENT
...CHANNEL_BASE_FRAG
...PLAYLIST_BASE_FRAG
...HASHTAG_BASE_FRAG
...LIVE_BASE_FRAGMENT
__typename
}
__typename
}
__typename
}
videos(
query: $query
first: $limit
page: $page
sort: $sortByVideos
durationMin: $durationMinVideos
durationMax: $durationMaxVideos
createdAfter: $createdAfterVideos
) @include(if: $shouldIncludeVideos) {
metadata {
id
algorithm {
uuid
__typename
}
__typename
}
pageInfo {
hasNextPage
nextPage
__typename
}
edges {
node {
id
...VIDEO_BASE_FRAGMENT
__typename
}
__typename
}
__typename
}
lives(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeLives) {
metadata {
id
algorithm {
uuid
__typename
}
__typename
}
pageInfo {
hasNextPage
nextPage
__typename
}
edges {
node {
id
...LIVE_BASE_FRAGMENT
__typename
}
__typename
}
__typename
}
channels(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeChannels) {
metadata {
id
algorithm {
uuid
__typename
}
__typename
}
pageInfo {
hasNextPage
nextPage
__typename
}
edges {
node {
id
...CHANNEL_BASE_FRAG
__typename
}
__typename
}
__typename
}
playlists: collections(query: $query, first: $limit, page: $page) @include(if: $shouldIncludePlaylists) {
metadata {
id
algorithm {
uuid
__typename
}
__typename
}
pageInfo {
hasNextPage
nextPage
__typename
}
edges {
node {
id
...PLAYLIST_BASE_FRAG
__typename
}
__typename
}
__typename
}
hashtags(query: $query, first: $limit, page: $page) @include(if: $shouldIncludeHashtags) {
metadata {
id
algorithm {
uuid
__typename
}
__typename
}
pageInfo {
hasNextPage
nextPage
__typename
}
edges {
node {
id
...HASHTAG_BASE_FRAG
__typename
}
__typename
}
__typename
}
__typename
}
}
"""
}
response = post_with_retry(
"https://graphql.api.dailymotion.com/",
json_payload=data,
headers=headers,
proxies=None
)
jsondata = response.json()
try:
resinfo = jsondata['data']['search']['stories']['edges']
print('resinfo :', len(resinfo))
except Exception:
resinfo = []
print("[搜索接口]", response.text)
print("返回字段解析错误!")
for index, iteminfo in enumerate(resinfo):
calculated_index = index + 1 + (j - 1) * 100
node = iteminfo['node']
if node['__typename'] != "Video":
continue
creator = node['creator']
duration = node.get('duration')
if duration <= 300:
continue
v_data = {
"index": calculated_index,
"v_id": node.get("id"),
"v_xid": node.get('xid'),
"link": "https://www.dailymotion.com/video/" + node.get('xid'),
"title": node.get("title"),
"createtime": node.get("createdAt"),
"duration": node.get("duration"),
"pic": node.get("thumbnail", {}).get("url"),
"view": 0,
"fans": 0,
"videos": 0,
"u_id": creator.get('id'),
"u_xid": creator.get('xid'),
"u_name": creator.get('name'),
"u_pic": node.get('thumbnail').get('url')
}
video_list.append(v_data)
return video_list
proxiesdict = db.get_proxy_agent_dict()
def search_worker(payload, kitem):
try:
gproxies = proxiesdict[kitem['rn']]
header = gettoken(gproxies)
v_list = get_searchInfo(kitem['keyword'], kitem['level'], header)
if not v_list:
for i in range(2):
time.sleep(i * 5)
v_list = get_searchInfo(kitem['keyword'], kitem['level'])
if v_list:
break
time.sleep(2)
return payload, kitem, v_list
except Exception as e:
raise RuntimeError(f"{kitem['keyword']} 处理失败: {e}") from e
def integrate_data():
while True:
keywords, flag = db.item_keyword()
if len(keywords) < 1:
time.sleep(30)
else:
for index, (payload, kitem) in enumerate(keywords):
try:
proxiesdict = db.get_proxy_agent_dict()
global Gproxies
Gproxies = proxiesdict[kitem['rn']]
gettoken()
v_list = get_searchInfo(kitem['keyword'], kitem['level'])
if not v_list:
for i in range(3):
time.sleep(i * 5)
v_list = get_searchInfo(kitem["keyword"], kitem['level'], headers)
if v_list:
break
time.sleep(2)
for item in v_list:
record = {
"keyword": kitem.get("keyword"),
"v_name": kitem.get("v_name"),
"v_id": item.get("v_id"),
"v_xid": item.get("v_xid"),
"link": item.get("link"),
"title": item.get("title"),
"duration": format_duration(item.get("duration")),
"fans": clean_dash_to_zero(item.get("fans", 0)),
"videos": clean_dash_to_zero(item.get("videos", 0)),
"watch_number": clean_dash_to_zero(item.get("view", 0)),
"create_time": format_create_time(item.get("createtime")),
"cover_pic": item.get("pic"),
"index": item.get("index", 0),
"u_id": item.get("u_id"),
"u_xid": item.get("u_xid"),
"u_name": item.get("u_name"),
"u_pic": item.get("u_pic"),
"rn": kitem.get("rn"),
"batch": kitem['batch'],
"machine_id": MACHINE_ID,
"level": kitem['level'],
}
db.upsert_video(record)
db.flush()
except Exception as e:
print(f"[异常] {str(e.__class__.__name__)}: {str(e)}")
print(f"[异常] 处理关键词 {kitem['keyword']} 时发生错误,正在回滚...")
time.sleep(5)
remaining_payloads = [p for p, _ in keywords[index:]]
if flag == 0:
db.rollback_l0(remaining_payloads)
elif flag == 1:
db.rollback_l1(remaining_payloads)
elif flag == 2:
db.rollback_l2(remaining_payloads)
time.sleep(5)
break
def parse_args() -> argparse.Namespace:
global MACHINE_ID, MAX_WORKERS
parser = argparse.ArgumentParser(
description="Configure worker settings."
)
parser.add_argument(
"-m", "--machine-id",
type=int,
help=f"Machine identifier (default: {MACHINE_ID})"
)
parser.add_argument(
"-w", "--max-workers",
type=int,
help=f"Maximum concurrent workers (default: {MAX_WORKERS})"
)
args = parser.parse_args()
if args.machine_id is not None:
MACHINE_ID = args.machine_id
if args.max_workers is not None:
if args.max_workers <= 0:
parser.error("--max-workers 不能是 0")
MAX_WORKERS = args.max_workers
if MACHINE_ID is None:
raise ValueError("请指定机器编号")
return args
if __name__ == '__main__':
parse_args()
start_time = datetime.datetime.now()
print(f"开始时间:{start_time.strftime('%Y-%m-%d %H:%M:%S')}")
integrate_data()
end_time = datetime.datetime.now()
duration = end_time - start_time