332 lines
16 KiB
Python

import json
import random
import time
import uuid
import concurrent.futures
import logging
from random import uniform
from datetime import datetime
from typing import Dict, List, Optional, Union
import pandas as pd
import requests
import os
import urllib3
from requests import RequestException
from fake_useragent import UserAgent
# 配置日志记录
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('dailymotion.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 禁用 SSL 警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 基础配置
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
KW_PATH = os.path.join(BASE_DIR, 'data', 'keyword.xlsx')
OUTPUT_DIR = os.path.join(BASE_DIR, 'out_put_CNTW')
# 创建输出目录
if not os.path.exists(OUTPUT_DIR):
os.makedirs(OUTPUT_DIR)
logger.info(f'创建输出目录: {OUTPUT_DIR}')
# 请求配置
MAX_RETRIES = 3
BASE_DELAY = 2
MAX_WORKERS = 5 # 并发线程数限制
REQUEST_TIMEOUT = 30 # 请求超时时间
class DailymotionAPI:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'Accept': '*/*, */*',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-Type': 'application/json, application/json',
'Host': 'graphql.api.dailymotion.com',
'Origin': 'https://www.dailymotion.com',
'Referer': 'https://www.dailymotion.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
'X-DM-AppInfo-Id': 'com.dailymotion.neon',
'X-DM-AppInfo-Type': 'website',
'X-DM-AppInfo-Version': 'v2025-04-28T12:37:52.391Z',
'X-DM-Neon-SSR': '0',
'X-DM-Preferred-Country': 'us',
'accept-language': 'zh-CN',
})
self.session.proxies = {
"http": 'http://127.0.0.1:7890',
"https": 'http://127.0.0.1:7890',
}
def _make_request(self, url: str, json_data: Dict, retries: int = MAX_RETRIES) -> Dict:
"""发送请求并处理响应
Args:
url: 请求URL
json_data: 请求数据
retries: 重试次数
Returns:
Dict: 响应数据
"""
for attempt in range(retries):
try:
self.session.headers['User-Agent'] = UserAgent().random
response = self.session.post(
url,
json=json_data,
timeout=REQUEST_TIMEOUT,
verify=False
)
response.raise_for_status()
return response.json()
except Exception as e:
if attempt == retries - 1:
logger.error(f'请求失败: {str(e)}')
raise
wait_time = BASE_DELAY * (2 ** attempt) + uniform(1, 3)
logger.warning(f'请求失败,等待 {wait_time:.2f} 秒后重试...')
time.sleep(wait_time)
def get_video_info(self, x_id: str) -> Dict[str, Union[int, str]]:
"""获取视频详细信息
Args:
x_id: 视频ID
Returns:
Dict: 包含视频统计信息的字典
"""
try:
payload = {
"operationName": "WATCHING_VIDEO",
"variables": {"xid": x_id, "isSEO": False},
"query": "fragment VIDEO_FRAGMENT on Video {\n id\n xid\n isPublished\n duration\n title\n description\n thumbnailx60: thumbnailURL(size: \"x60\")\n thumbnailx120: thumbnailURL(size: \"x120\")\n thumbnailx240: thumbnailURL(size: \"x240\")\n thumbnailx360: thumbnailURL(size: \"x360\")\n thumbnailx480: thumbnailURL(size: \"x480\")\n thumbnailx720: thumbnailURL(size: \"x720\")\n thumbnailx1080: thumbnailURL(size: \"x1080\")\n aspectRatio\n category\n categories(filter: {category: {eq: CONTENT_CATEGORY}}) {\n edges {\n node { id name slug __typename }\n __typename\n }\n __typename\n }\n iab_categories: categories(\n filter: {category: {eq: IAB_CATEGORY}, percentage: {gte: 70}}\n ) {\n edges {\n node { id slug __typename }\n __typename\n }\n __typename\n }\n bestAvailableQuality\n createdAt\n viewerEngagement {\n id\n liked\n favorited\n __typename\n }\n isPrivate\n isWatched\n isCreatedForKids\n isExplicit\n canDisplayAds\n videoWidth: width\n videoHeight: height\n status\n hashtags {\n edges {\n node { id name __typename }\n __typename\n }\n __typename\n }\n stats {\n id\n views { id total __typename }\n __typename\n }\n channel {\n __typename\n id\n xid\n name\n displayName\n isArtist\n logoURLx25: logoURL(size: \"x25\")\n logoURL(size: \"x60\")\n isFollowed\n accountType\n coverURLx375: coverURL(size: \"x375\")\n stats {\n id\n views { id total __typename }\n followers { id total __typename }\n videos { id total __typename }\n __typename\n }\n country { id codeAlpha2 __typename }\n organization @skip(if: $isSEO) {\n id\n xid\n owner { id xid __typename }\n __typename\n }\n }\n language { id codeAlpha2 __typename }\n tags {\n edges {\n node { id label __typename }\n __typename\n }\n __typename\n }\n moderation { id reviewedAt __typename }\n topics(whitelistedOnly: true, first: 3, page: 1) {\n edges {\n node {\n id\n xid\n name\n names {\n edges {\n node {\n id\n name\n language { id codeAlpha2 __typename }\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n geoblockedCountries {\n id\n allowed\n denied\n __typename\n }\n transcript {\n edges {\n node { id timecode text __typename }\n __typename\n }\n __typename\n }\n __typename\n}\n\nfragment LIVE_FRAGMENT on Live {\n id\n xid\n startAt\n endAt\n isPublished\n title\n description\n thumbnailx60: thumbnailURL(size: \"x60\")\n thumbnailx120: thumbnailURL(size: \"x120\")\n thumbnailx240: thumbnailURL(size: \"x240\")\n thumbnailx360: thumbnailURL(size: \"x360\")\n thumbnailx480: thumbnailURL(size: \"x480\")\n thumbnailx720: thumbnailURL(size: \"x720\")\n thumbnailx1080: thumbnailURL(size: \"x1080\")\n aspectRatio\n category\n createdAt\n viewerEngagement { id liked favorited __typename }\n isPrivate\n isExplicit\n isCreatedForKids\n bestAvailableQuality\n canDisplayAds\n videoWidth: width\n videoHeight: height\n stats { id views { id total __typename } __typename }\n channel {\n __typename\n id\n xid\n name\n displayName\n isArtist\n logoURLx25: logoURL(size: \"x25\")\n logoURL(size: \"x60\")\n isFollowed\n accountType\n coverURLx375: coverURL(size: \"x375\")\n stats { id views { id total __typename } followers { id total __typename } videos { id total __typename } __typename }\n country { id codeAlpha2 __typename }\n organization @skip(if: $isSEO) { id xid owner { id xid __typename } __typename }\n }\n language { id codeAlpha2 __typename }\n tags { edges { node { id label __typename } __typename } __typename }\n moderation { id reviewedAt __typename }\n topics(whitelistedOnly: true, first: 3, page: 1) {\n edges { node { id xid name names { edges { node { id name language { id codeAlpha2 __typename } __typename } __typename } __typename } __typename } __typename }\n __typename\n }\n geoblockedCountries { id allowed denied __typename }\n __typename\n}\n\nquery WATCHING_VIDEO($xid: String!, $isSEO: Boolean!) {\n video: media(xid: $xid) {\n __typename\n ... on Video { id ...VIDEO_FRAGMENT __typename }\n ... on Live { id ...LIVE_FRAGMENT __typename }\n }\n}"
}
response_data = self._make_request('https://graphql.api.dailymotion.com/', payload)
v_info = response_data['data']['video']['channel']['stats']
return {
"view": v_info['views']['total'],
"fans": v_info['followers']['total'],
"videos": v_info['videos']['total'],
}
except Exception as e:
logger.error(f'获取视频信息失败: {str(e)}')
return {"view": '-', "fans": '-', "videos": '-'}
def process_video(api: DailymotionAPI, node: Dict, calculated_index: int) -> Optional[Dict]:
"""处理单个视频信息
Args:
api: DailymotionAPI实例
node: 视频节点数据
calculated_index: 计算的索引
Returns:
Optional[Dict]: 处理后的视频信息
"""
xid = node.get('xid')
try:
logger.info(f'开始处理视频 {xid} (索引: {calculated_index})')
# 添加随机延迟避免请求过于频繁
time.sleep(uniform(1, 2))
v_info = api.get_video_info(xid)
result = {
"index": calculated_index,
"id": node.get('id'),
"xid": xid,
"link": f"https://www.dailymotion.com/video/{xid}",
"title": node.get('title'),
"createtime": node.get('createdAt'),
"duration": node.get('duration'),
"pic": node.get('thumbnail', {}).get('url'),
"view": v_info['view'],
"fans": v_info['fans'],
"videos": v_info['videos']
}
logger.debug(f'视频 {xid} 处理成功')
return result
except Exception as e:
logger.error(f'处理视频 {xid} 出错: {str(e)}')
return None
def process_videos_batch(api: DailymotionAPI, videos: List[Dict], start_index: int) -> List[Dict]:
"""批量处理视频信息
Args:
api: DailymotionAPI实例
videos: 视频列表
start_index: 起始索引
Returns:
List[Dict]: 处理后的视频信息列表
"""
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_video = {executor.submit(process_video, api, video, i): (video, i)
for i, video in enumerate(videos, start=start_index)}
for future in concurrent.futures.as_completed(future_to_video):
video, index = future_to_video[future]
try:
result = future.result()
if result:
results.append(result)
except Exception as e:
logger.error(f'处理视频失败 (索引: {index}): {str(e)}')
return results
def save_results(results: List[Dict], output_file: str):
"""保存处理结果
Args:
results: 处理结果列表
output_file: 输出文件路径
"""
try:
df = pd.DataFrame(results)
df.to_excel(output_file, index=False, engine='openpyxl')
logger.info(f'结果已保存到: {output_file}')
except Exception as e:
logger.error(f'保存结果失败: {str(e)}')
def search_videos(api: DailymotionAPI, keyword: str, page: int = 1) -> List[Dict]:
"""搜索视频列表
Args:
api: DailymotionAPI实例
keyword: 搜索关键词
page: 页码
Returns:
List[Dict]: 视频列表
"""
try:
payload = {
"operationName": "SEARCH_VIDEOS",
"variables": {
"query": keyword,
"page": page,
"limit": 20,
"sort": "relevance"
},
"query": "query SEARCH_VIDEOS($query: String!, $page: Int!, $limit: Int!, $sort: String!) {\n videos(\n first: $limit\n page: $page\n search: {query: $query, sort: $sort}\n ) {\n pageInfo { hasNextPage currentPage __typename }\n edges {\n node {\n id\n xid\n title\n createdAt\n duration\n thumbnail { url __typename }\n __typename\n }\n __typename\n }\n __typename\n }\n}"
}
response = api._make_request('https://graphql.api.dailymotion.com/', payload)
videos = response['data']['videos']['edges']
return [video['node'] for video in videos]
except Exception as e:
logger.error(f'搜索视频失败: {str(e)}')
return []
def load_progress(keyword: str) -> Dict:
"""加载进度信息
Args:
keyword: 关键词
Returns:
Dict: 进度信息
"""
progress_file = os.path.join(OUTPUT_DIR, f'{keyword}_progress.json')
if os.path.exists(progress_file):
try:
with open(progress_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f'加载进度失败: {str(e)}')
return {'page': 1, 'video_data': [], 'user_data': []}
def save_progress(keyword: str, progress: Dict):
"""保存进度信息
Args:
keyword: 关键词
progress: 进度信息
"""
progress_file = os.path.join(OUTPUT_DIR, f'{keyword}_progress.json')
try:
with open(progress_file, 'w', encoding='utf-8') as f:
json.dump(progress, f)
except Exception as e:
logger.error(f'保存进度失败: {str(e)}')
def main():
"""主函数"""
try:
# 读取关键词
df = pd.read_excel(KW_PATH)
if '搜索词' in df.columns:
keywords = df['搜索词'].tolist()
elif 'keyword' in df.columns:
keywords = df['keyword'].tolist()
else:
raise ValueError('Excel文件中未找到列名"搜索词""keyword",请检查文件格式')
api = DailymotionAPI()
for keyword in keywords:
logger.info(f'开始处理关键词: {keyword}')
# 加载进度
progress = load_progress(keyword)
current_page = progress['page']
video_data = progress['video_data']
try:
while True:
# 搜索视频
videos = search_videos(api, keyword, current_page)
if not videos:
break
# 处理视频信息
results = process_videos_batch(api, videos, len(video_data))
video_data.extend(results)
# 保存进度
progress['page'] = current_page
progress['video_data'] = video_data
save_progress(keyword, progress)
logger.info(f'已处理 {len(video_data)} 个视频')
current_page += 1
# 保存结果
if video_data:
output_file = os.path.join(OUTPUT_DIR, f'{keyword}_results.xlsx')
save_results(video_data, output_file)
except Exception as e:
logger.error(f'处理关键词 {keyword} 出错: {str(e)}')
continue
except Exception as e:
logger.error(f'程序执行出错: {str(e)}')
finally:
logger.info('程序执行完成')
if __name__ == '__main__':
main()