diff --git a/oneget.py b/oneget.py index dbd9df7..90ab7bf 100644 --- a/oneget.py +++ b/oneget.py @@ -1,6 +1,6 @@ import base64 from datetime import datetime - +import concurrent.futures import requests import uuid import random @@ -8,16 +8,57 @@ import time import copy from threading import Lock import logging -from DB import DBVidcon +from DB import DBVidcon, DBSA import json from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry +from dateutil import parser as date_parser MACHINE_ID = 3 logger = logging.getLogger(__name__) db = DBVidcon() proxiesdict = db.get_proxy_agent_dict() + +class RetryRequests: + def __init__( + self, + proxies: dict = None, + timeout: int = 10, + total: int = 3, + backoff_factor: float = 1.0, + status_forcelist: tuple = (500, 502, 503, 504), + allowed_methods: tuple = ("GET", "POST"), + ): + self.session = requests.Session() + self.timeout = timeout + self.proxies = proxies + + retry = Retry( + total=total, + backoff_factor=backoff_factor, + status_forcelist=status_forcelist, + allowed_methods=allowed_methods, + raise_on_status=False + ) + adapter = HTTPAdapter(max_retries=retry) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + + def get(self, url, **kwargs): + kwargs.setdefault("timeout", self.timeout) + if self.proxies: + kwargs.setdefault("proxies", self.proxies) + return self.session.get(url, **kwargs) + + def post(self, url, **kwargs): + kwargs.setdefault("timeout", self.timeout) + if self.proxies: + kwargs.setdefault("proxies", self.proxies) + return self.session.post(url, **kwargs) + +req = RetryRequests() + def clean_dash_to_zero(val): if val in ('-', '', None): return 0 @@ -44,6 +85,7 @@ def format_duration(seconds): except Exception: return "00:00" + class DMHeaderManager: _headers_template = { 'Accept': '*/*, */*', @@ -86,22 +128,6 @@ class DMHeaderManager: self._proxies = proxies def get_headers(self, retry: int = 2) -> dict: - for attempt in range(retry + 1): - try: - return self._generate_headers() - except Exception as e: - logger.warning(f"[get_headers] 第 {attempt + 1} 次尝试失败: {e}") - time.sleep(2) - - with self._cache_lock: - if self._headers_cache: - logger.info("[get_headers]") - return copy.deepcopy(self._headers_cache) - - logger.warning("[get_headers] 基础 headers") - return copy.deepcopy(self._headers_template) - - def _generate_headers(self) -> dict: visitor_id = str(uuid.uuid4()) visit_id = str(int(time.time() * 1000)) traffic_segment = str(random.randint(100_000, 999_999)) @@ -133,7 +159,7 @@ class DMHeaderManager: 'visitor_id': visitor_id, } - response = requests.post( + response = req.post( 'https://graphql.api.dailymotion.com/oauth/token', headers=token_headers, data=data, @@ -155,32 +181,11 @@ class DMHeaderManager: return new_headers - class DMVideoInfo: def __init__(self, proxies: dict = None, max_retries: int = 3, backoff_factor: float = 0.5): self.proxies = proxies self.max_retries = max_retries self.backoff_factor = backoff_factor - self.session = self._create_session() - - def _create_session(self): - session = requests.Session() - retry = Retry( - total=self.max_retries, - connect=self.max_retries, - read=self.max_retries, - backoff_factor=self.backoff_factor, - status_forcelist=[500, 502, 503, 504], - allowed_methods=["GET"] - ) - adapter = HTTPAdapter(max_retries=retry) - session.mount("http://", adapter) - session.mount("https://", adapter) - - if self.proxies: - session.proxies.update(self.proxies) - - return session def get_video_info(self, data: dict) -> dict: v_xid = data.get('v_xid') @@ -192,25 +197,37 @@ class DMVideoInfo: } try: - resp = self.session.get(url, params=params, timeout=10) + resp = req.get(url, params=params, timeout=10) resp.raise_for_status() r_data = resp.json() xid = r_data["id"] vid = base64.b64encode(f"Video:{xid}".encode('utf-8')).decode('utf-8') uxid = r_data["owner.id"] uid = base64.b64encode(f"Channel:{uxid}".encode('utf-8')).decode('utf-8') + duration = r_data.get("duration", 0) + if duration < 30: + return None data["v_id"] = vid - data["v_title"] = r_data["title"] - data["link"] = "https://www.dailymotion.com/video/" + xid, - data["duration"] = r_data["duration"] - data['createdtime'] = datetime.fromtimestamp(r_data.get("created_time")).strftime("%Y-%m-%d %H:%M:%S"), - data[''] + data["title"] = r_data.get("title", "") + data["link"] = "https://www.dailymotion.com/video/" + xid + data["duration"] = format_duration(r_data.get("duration", 0)) + data['create_time'] = format( + datetime.fromtimestamp(r_data.get("created_time")).strftime("%Y-%m-%d %H:%M:%S")) + data['fans'] = clean_dash_to_zero(r_data.get("owner.followers_total", 0)) + data['videos'] = clean_dash_to_zero(r_data.get("owner.videos_total", 0)) + data['watch_number'] = clean_dash_to_zero(r_data.get("views_total", 0)) + data['cover_pic'] = r_data.get('thumbnail_240_url') + data['u_id'] = uid + data['u_xid'] = uxid + data['u_name'] = r_data.get("owner.screenname", "") + data['u_pic'] = r_data.get("owner.avatar_60_url", "") + DBSA.upsert_video(data) + DBSA.flush() except requests.RequestException as e: print(f"[ERROR] 请求失败 vxid={v_xid} : {e}") return None - def main(): kwdata = db.get_web_items() if not kwdata: @@ -220,15 +237,15 @@ def main(): kwdata = kwdata[0][1] rn = kwdata['rn'] proxy_name = proxiesdict.get(rn) - proxies_str = db.get_proxy(proxy_name, '-1') + proxies_str = "http://127.0.0.1:10808" + # proxies_str = db.get_proxy(proxy_name, '-1') proxies = { 'http': proxies_str, 'https': proxies_str } kw = kwdata['keyword'] - dmheader_manager = DMHeaderManager(proxies=proxies) - + dmvideo_info = DMVideoInfo(proxies=proxies) headers = dmheader_manager.get_headers() for i in range(1, 11): data = { @@ -600,17 +617,16 @@ def main(): payload = json.dumps(data).encode() - response = requests.post('https://graphql.api.dailymotion.com/', headers=headers, data=payload, + response = req.post('https://graphql.api.dailymotion.com/', headers=headers, data=payload, proxies=proxies) data = response.json() edges = data['data']['search']['stories']['edges'] edges_len = len(edges) - dm_video_info = DMVideoInfo(proxies=proxies) tancks = [] for j, edge in enumerate(edges): node = edge.get("node", {}) - tancks.append({ + s_data = { "keyword": kw, "v_name": kwdata.get("v_name", ""), "v_xid": node.get("xid"), @@ -619,7 +635,14 @@ def main(): "machine_id": MACHINE_ID, "index": (i - 1) * 20 + j + 1, "level": 0, - }) - + } + tancks.append(s_data) + # 我想在这加入20 个线程池 + with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: + executor.map(dmvideo_info.get_video_info, tancks) if edges_len < 20: break + + +if __name__ == '__main__': + main()