feat: 添加重试机制和并发处理以优化视频信息请求

This commit is contained in:
晓丰 2025-07-17 16:38:18 +08:00
parent b512b05e8b
commit d1307039c8

133
oneget.py
View File

@ -1,6 +1,6 @@
import base64
from datetime import datetime
import concurrent.futures
import requests
import uuid
import random
@ -8,16 +8,57 @@ import time
import copy
from threading import Lock
import logging
from DB import DBVidcon
from DB import DBVidcon, DBSA
import json
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from dateutil import parser as date_parser
MACHINE_ID = 3
logger = logging.getLogger(__name__)
db = DBVidcon()
proxiesdict = db.get_proxy_agent_dict()
class RetryRequests:
def __init__(
self,
proxies: dict = None,
timeout: int = 10,
total: int = 3,
backoff_factor: float = 1.0,
status_forcelist: tuple = (500, 502, 503, 504),
allowed_methods: tuple = ("GET", "POST"),
):
self.session = requests.Session()
self.timeout = timeout
self.proxies = proxies
retry = Retry(
total=total,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
allowed_methods=allowed_methods,
raise_on_status=False
)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def get(self, url, **kwargs):
kwargs.setdefault("timeout", self.timeout)
if self.proxies:
kwargs.setdefault("proxies", self.proxies)
return self.session.get(url, **kwargs)
def post(self, url, **kwargs):
kwargs.setdefault("timeout", self.timeout)
if self.proxies:
kwargs.setdefault("proxies", self.proxies)
return self.session.post(url, **kwargs)
req = RetryRequests()
def clean_dash_to_zero(val):
if val in ('-', '', None):
return 0
@ -44,6 +85,7 @@ def format_duration(seconds):
except Exception:
return "00:00"
class DMHeaderManager:
_headers_template = {
'Accept': '*/*, */*',
@ -86,22 +128,6 @@ class DMHeaderManager:
self._proxies = proxies
def get_headers(self, retry: int = 2) -> dict:
for attempt in range(retry + 1):
try:
return self._generate_headers()
except Exception as e:
logger.warning(f"[get_headers] 第 {attempt + 1} 次尝试失败: {e}")
time.sleep(2)
with self._cache_lock:
if self._headers_cache:
logger.info("[get_headers]")
return copy.deepcopy(self._headers_cache)
logger.warning("[get_headers] 基础 headers")
return copy.deepcopy(self._headers_template)
def _generate_headers(self) -> dict:
visitor_id = str(uuid.uuid4())
visit_id = str(int(time.time() * 1000))
traffic_segment = str(random.randint(100_000, 999_999))
@ -133,7 +159,7 @@ class DMHeaderManager:
'visitor_id': visitor_id,
}
response = requests.post(
response = req.post(
'https://graphql.api.dailymotion.com/oauth/token',
headers=token_headers,
data=data,
@ -155,32 +181,11 @@ class DMHeaderManager:
return new_headers
class DMVideoInfo:
def __init__(self, proxies: dict = None, max_retries: int = 3, backoff_factor: float = 0.5):
self.proxies = proxies
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.session = self._create_session()
def _create_session(self):
session = requests.Session()
retry = Retry(
total=self.max_retries,
connect=self.max_retries,
read=self.max_retries,
backoff_factor=self.backoff_factor,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["GET"]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
if self.proxies:
session.proxies.update(self.proxies)
return session
def get_video_info(self, data: dict) -> dict:
v_xid = data.get('v_xid')
@ -192,25 +197,37 @@ class DMVideoInfo:
}
try:
resp = self.session.get(url, params=params, timeout=10)
resp = req.get(url, params=params, timeout=10)
resp.raise_for_status()
r_data = resp.json()
xid = r_data["id"]
vid = base64.b64encode(f"Video:{xid}".encode('utf-8')).decode('utf-8')
uxid = r_data["owner.id"]
uid = base64.b64encode(f"Channel:{uxid}".encode('utf-8')).decode('utf-8')
duration = r_data.get("duration", 0)
if duration < 30:
return None
data["v_id"] = vid
data["v_title"] = r_data["title"]
data["link"] = "https://www.dailymotion.com/video/" + xid,
data["duration"] = r_data["duration"]
data['createdtime'] = datetime.fromtimestamp(r_data.get("created_time")).strftime("%Y-%m-%d %H:%M:%S"),
data['']
data["title"] = r_data.get("title", "")
data["link"] = "https://www.dailymotion.com/video/" + xid
data["duration"] = format_duration(r_data.get("duration", 0))
data['create_time'] = format(
datetime.fromtimestamp(r_data.get("created_time")).strftime("%Y-%m-%d %H:%M:%S"))
data['fans'] = clean_dash_to_zero(r_data.get("owner.followers_total", 0))
data['videos'] = clean_dash_to_zero(r_data.get("owner.videos_total", 0))
data['watch_number'] = clean_dash_to_zero(r_data.get("views_total", 0))
data['cover_pic'] = r_data.get('thumbnail_240_url')
data['u_id'] = uid
data['u_xid'] = uxid
data['u_name'] = r_data.get("owner.screenname", "")
data['u_pic'] = r_data.get("owner.avatar_60_url", "")
DBSA.upsert_video(data)
DBSA.flush()
except requests.RequestException as e:
print(f"[ERROR] 请求失败 vxid={v_xid} : {e}")
return None
def main():
kwdata = db.get_web_items()
if not kwdata:
@ -220,15 +237,15 @@ def main():
kwdata = kwdata[0][1]
rn = kwdata['rn']
proxy_name = proxiesdict.get(rn)
proxies_str = db.get_proxy(proxy_name, '-1')
proxies_str = "http://127.0.0.1:10808"
# proxies_str = db.get_proxy(proxy_name, '-1')
proxies = {
'http': proxies_str,
'https': proxies_str
}
kw = kwdata['keyword']
dmheader_manager = DMHeaderManager(proxies=proxies)
dmvideo_info = DMVideoInfo(proxies=proxies)
headers = dmheader_manager.get_headers()
for i in range(1, 11):
data = {
@ -600,17 +617,16 @@ def main():
payload = json.dumps(data).encode()
response = requests.post('https://graphql.api.dailymotion.com/', headers=headers, data=payload,
response = req.post('https://graphql.api.dailymotion.com/', headers=headers, data=payload,
proxies=proxies)
data = response.json()
edges = data['data']['search']['stories']['edges']
edges_len = len(edges)
dm_video_info = DMVideoInfo(proxies=proxies)
tancks = []
for j, edge in enumerate(edges):
node = edge.get("node", {})
tancks.append({
s_data = {
"keyword": kw,
"v_name": kwdata.get("v_name", ""),
"v_xid": node.get("xid"),
@ -619,7 +635,14 @@ def main():
"machine_id": MACHINE_ID,
"index": (i - 1) * 20 + j + 1,
"level": 0,
})
}
tancks.append(s_data)
# 我想在这加入20 个线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
executor.map(dmvideo_info.get_video_info, tancks)
if edges_len < 20:
break
if __name__ == '__main__':
main()