diff --git a/DB.py b/DB.py index 61ee9ae..84e6b48 100644 --- a/DB.py +++ b/DB.py @@ -29,51 +29,51 @@ _engine = create_engine( _meta = MetaData() # 操作记录表 video_op = Table("sh_dm_video_op_v3", _meta, - Column("id", Integer, primary_key=True, autoincrement=True), - Column("v_id", String(64)), - Column("v_xid", String(64)), - Column("a_id", Integer, default=0), - Column("level", Integer), - Column("name_title", String(100)), - Column("keyword", String(100)), - Column("rn", String(50)), - Column("history_status", String(100)), - Column("is_repeat", Integer), - Column("is_piracy", String(2), default='3'), - Column("sort", Integer), - Column("createtime", Integer), - Column("updatetime", Integer), - Column("operatetime", Integer), - Column("batch", Integer), - Column("machine", Integer), - Column("ts_status", Integer, default=1), -) + Column("id", Integer, primary_key=True, autoincrement=True), + Column("v_id", String(64)), + Column("v_xid", String(64)), + Column("a_id", Integer, default=0), + Column("level", Integer), + Column("name_title", String(100)), + Column("keyword", String(100)), + Column("rn", String(50)), + Column("history_status", String(100)), + Column("is_repeat", Integer), + Column("is_piracy", String(2), default='3'), + Column("sort", Integer), + Column("createtime", Integer), + Column("updatetime", Integer), + Column("operatetime", Integer), + Column("batch", Integer), + Column("machine", Integer), + Column("ts_status", Integer, default=1), + ) # 视频表 video = Table("sh_dm_video_v3", _meta, - Column("id", Integer, primary_key=True, autoincrement=True), - Column("v_id", String(64)), - Column("v_name", String(100), nullable=False), - Column("v_xid", String(64), nullable=False), - Column("title", String(255), nullable=False), - Column("link", String(255), nullable=False), - Column("edition", String(255), default=''), - Column("duration", String(11), default='0'), - Column("watch_number", Integer, default=0), - Column("follow_number", Integer, default=0), - Column("video_number", Integer, default=0), - Column("public_time", DateTime), - Column("cover_pic", String(255)), - Column("sort", Integer), - Column("u_xid", String(64)), - Column("u_id", String(100)), - Column("u_pic", String(255)), - Column("u_name", String(255)), - Column("status", Integer, default=1), - Column("createtime", Integer, default=0), - Column("updatetime", Integer, default=0), - Column("operatetime", Integer, default=0), -) + Column("id", Integer, primary_key=True, autoincrement=True), + Column("v_id", String(64)), + Column("v_name", String(100), nullable=False), + Column("v_xid", String(64), nullable=False), + Column("title", String(255), nullable=False), + Column("link", String(255), nullable=False), + Column("edition", String(255), default=''), + Column("duration", String(11), default='0'), + Column("watch_number", Integer, default=0), + Column("follow_number", Integer, default=0), + Column("video_number", Integer, default=0), + Column("public_time", DateTime), + Column("cover_pic", String(255)), + Column("sort", Integer), + Column("u_xid", String(64)), + Column("u_id", String(100)), + Column("u_pic", String(255)), + Column("u_name", String(255)), + Column("status", Integer, default=1), + Column("createtime", Integer, default=0), + Column("updatetime", Integer, default=0), + Column("operatetime", Integer, default=0), + ) # 作者表 video_author = Table( "sh_dm_video_author", @@ -116,7 +116,8 @@ def mysql_retry(max_retries: int = 3, base_delay: float = 1.0): }.get(errno, f"MySQL错误{errno}") wait = base_delay * (2 ** (attempt - 1)) - logger.warning(f"[MySQL][{fn.__name__}] 第{attempt}次重试({errno} {reason}):{e},等待 {wait:.1f}s...") + logger.warning( + f"[MySQL][{fn.__name__}] 第{attempt}次重试({errno} {reason}):{e},等待 {wait:.1f}s...") # 仅对断连类错误尝试重连 if errno in {2013, 2006}: @@ -128,8 +129,10 @@ def mysql_retry(max_retries: int = 3, base_delay: float = 1.0): raise return wrapper + return decorator + def redis_retry(max_retries: int = 3): """ 装饰器工厂:指定最大重试次数。 @@ -175,6 +178,7 @@ class DBVidcon: self.l0_list_key = "video_l0_queue" self.l1_list_key = "video_l1_queue" self.l2_list_key = "video_l2_queue" + self.web_list_key = "video_web_queue" self.report_list = "report_queue" self.error_list_key = "error_save_queue" self.conn = pymysql.connect(**self._MYSQL_CONF) @@ -257,6 +261,13 @@ class DBVidcon: self.redis.lpush(self.l2_list_key, *raws) logger.info(f"[写入l2] 已推入 {len(raws)} 条") + @redis_retry(max_retries=3) + def push_web(self, raws): + if isinstance(raws, str): + raws = [raws] + self.redis.lpush(self.web_list_key, *raws) + logger.info(f"[写入web] 已推入 {len(raws)} 条") + @redis_retry(max_retries=3) def push_report(self, raws): """原子操作:清空列表并写入新数据""" @@ -318,6 +329,17 @@ class DBVidcon: items = [] return items + @redis_retry(max_retries=3) + def get_web_items(self, count: int = 1): + try: + items = self.fetch_from_redis(count, list_key=self.web_list_key) + except Exception as e: + logger.info("[Redis web pop error]", e) + self.reconnect_redis() + items = [] + return items + + @redis_retry(max_retries=3) def item_keyword(self, count: int = 20): try: @@ -373,53 +395,48 @@ class DBVidcon: @mysql_retry() def get_report_video(self): sql = """ - SELECT - id, - name_title, - link - FROM - sh_dm_fight_records - WHERE - status = 1 - """ + SELECT id, + name_title, + link + FROM sh_dm_fight_records + WHERE status = 1 \ + """ self.cursor.execute(sql) return self.cursor.fetchall() @mysql_retry() - def get_subsequent_report_video(self,did: int): + def get_subsequent_report_video(self, did: int): sql = """ - SELECT DISTINCT report_id - FROM sh_dm_fight_records - WHERE - status = 2 - AND subsequent_status = 1 - AND report_time != '' - AND mid = %s - """ - self.cursor.execute(sql,did) + SELECT DISTINCT report_id + FROM sh_dm_fight_records + WHERE status = 2 + AND subsequent_status = 1 + AND report_time != '' + AND mid = %s \ + """ + self.cursor.execute(sql, did) return self.cursor.fetchall() @mysql_retry() def getreport_video(self): sql = """ - SELECT - id, - v_xid - FROM - sh_dm_fight_records - WHERE - is_removed = '' or is_removed IS NULL or is_removed = 0 - """ + SELECT id, + v_xid + FROM sh_dm_fight_records + WHERE is_removed = '' + or is_removed IS NULL + or is_removed = 0 \ + """ self.cursor.execute(sql) return self.cursor.fetchall() @mysql_retry() def mark_video_removed(self, d_id: int, removed_flag: int = 1): sql = """ - UPDATE sh_dm_fight_records - SET is_removed = %s - WHERE id = %s - """ + UPDATE sh_dm_fight_records + SET is_removed = %s + WHERE id = %s \ + """ self.cursor.execute(sql, (removed_flag, d_id)) self.flush() @@ -464,11 +481,12 @@ class DBVidcon: @mysql_retry() def update_video_ts_status(self): sql = """ - UPDATE sh_dm_video_v2 v - JOIN sh_dm_video_author a ON v.u_xid = a.u_xid - SET v.ts_status = 3 - WHERE a.white_status = 1; - """ + UPDATE sh_dm_video_v2 v + JOIN sh_dm_video_author a + ON v.u_xid = a.u_xid + SET v.ts_status = 3 + WHERE a.white_status = 1; \ + """ self.cursor.execute(sql) self.flush() logger.info("[更新视频举报状态] 已执行完毕") @@ -488,38 +506,32 @@ class DBVidcon: while True: try: sql_op = """ - INSERT INTO sh_dm_video_op_v2 ( - v_id, v_xid, a_id, level, name_title, - keyword, rn, history_status, is_repeat, - sort, createtime, updatetime, batch, machine - ) VALUES ( - %(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, - %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, - %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s - ) - """ + INSERT INTO sh_dm_video_op_v2 (v_id, v_xid, a_id, level, name_title, + keyword, rn, history_status, is_repeat, + sort, createtime, updatetime, batch, machine) + VALUES (%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, + %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, + %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s) \ + """ self.cursor.execute(sql_op, data) sql_update = """ - INSERT INTO sh_dm_video_v2 ( - v_id, v_xid, rn, v_name, title, link, - edition, duration, - public_time, cover_pic, sort, - u_xid, u_id, u_pic, u_name, - status, createtime, updatetime - ) VALUES ( - %(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, - '', %(duration)s, - %(create_time)s, %(cover_pic)s, %(sort)s, - %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, - 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP() - ) - ON DUPLICATE KEY UPDATE - title = VALUES(title), - duration = VALUES(duration), - cover_pic = VALUES(cover_pic), - sort = VALUES(sort), - updatetime = UNIX_TIMESTAMP(); - """ + INSERT INTO sh_dm_video_v2 (v_id, v_xid, rn, v_name, title, link, + edition, duration, + public_time, cover_pic, sort, + u_xid, u_id, u_pic, u_name, + status, createtime, updatetime) + VALUES (%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, + '', %(duration)s, + %(create_time)s, %(cover_pic)s, %(sort)s, + %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, + 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP()) ON DUPLICATE KEY + UPDATE + title = + VALUES (title), duration = + VALUES (duration), cover_pic = + VALUES (cover_pic), sort = + VALUES (sort), updatetime = UNIX_TIMESTAMP(); \ + """ self.cursor.execute(sql_update, data) break # 成功跳出重试循环 @@ -545,33 +557,26 @@ class DBVidcon: @mysql_retry() def fetch_keyword_title(self, level: int = 99): sql = """ - SELECT - k.keyword, - k.rn, - t.title AS v_name, - ANY_VALUE(t.level) AS level - FROM - sh_dm_keyword k - LEFT JOIN - sh_dm_title t ON k.title = t.title - WHERE - k.status = 1 - AND t.status = 1 - AND NOT EXISTS ( - SELECT 1 - FROM sh_dm_black_keyword b - WHERE - ( - (b.keyword IS NULL OR b.keyword = '') AND b.title = t.title - ) - OR ( - b.keyword IS NOT NULL AND b.keyword != '' AND b.keyword = k.keyword - ) - ) - AND t.level = %s - GROUP BY - k.keyword, k.rn; - """ + SELECT k.keyword, + k.rn, + t.title AS v_name, + ANY_VALUE(t.level) AS level + FROM sh_dm_keyword k + LEFT JOIN + sh_dm_title t ON k.title = t.title + WHERE k.status = 1 + AND t.status = 1 + AND NOT EXISTS (SELECT 1 + FROM sh_dm_black_keyword b + WHERE ( + (b.keyword IS NULL OR b.keyword = '') AND b.title = t.title + ) + OR ( + b.keyword IS NOT NULL AND b.keyword != '' AND b.keyword = k.keyword + )) + AND t.level = %s + GROUP BY k.keyword, k.rn; \ + """ self.cursor.execute(sql, (level,)) return self.cursor.fetchall() @@ -592,10 +597,10 @@ class DBVidcon: start_ts = int(time.time()) sql = """ - INSERT INTO sh_dm_batch_log - (batch, info, t0, t1, t2, starttime) - VALUES (%s, %s, %s, %s, %s, %s) - """ + INSERT INTO sh_dm_batch_log + (batch, info, t0, t1, t2, starttime) + VALUES (%s, %s, %s, %s, %s, %s) \ + """ try: self.cursor.execute(sql, (batch, level, t0, t1, t2, start_ts)) self.conn.commit() @@ -666,6 +671,10 @@ class DBVidcon: def l2_empty(self) -> bool: return self.redis.llen(self.l2_list_key) == 0 + @redis_retry(max_retries=3) + def web_empty(self) -> bool: + return self.redis.llen(self.web_list_key) == 0 + @redis_retry(max_retries=3) def pop_error_item(self): """ @@ -679,30 +688,30 @@ class DBVidcon: class DBSA: # ======= 可调参数 ======= - FLUSH_EVERY_ROWS = 100 # 行数阈值 - FLUSH_INTERVAL = 30 # 秒阈值 - MAX_SQL_RETRY = 3 # SQL 死锁自旋 - SQL_RETRY_BASE_SLEEP = 0.5 # 自旋退避基数 - FLUSH_RETRY = 3 # flush 整体轮次 - DELAY_ON_FAIL = 10 # flush 失败等待 - DEADLOCK_ERRNO = 1213 # MySQL 死锁码 - LOCK_TIMEOUT = 3 # 互斥锁超时 + FLUSH_EVERY_ROWS = 100 # 行数阈值 + FLUSH_INTERVAL = 30 # 秒阈值 + MAX_SQL_RETRY = 3 # SQL 死锁自旋 + SQL_RETRY_BASE_SLEEP = 0.5 # 自旋退避基数 + FLUSH_RETRY = 3 # flush 整体轮次 + DELAY_ON_FAIL = 10 # flush 失败等待 + DEADLOCK_ERRNO = 1213 # MySQL 死锁码 + LOCK_TIMEOUT = 3 # 互斥锁超时 # ======================== # ----- 缓冲区 ----- - _buf_op = [] - _buf_vid = [] + _buf_op = [] + _buf_vid = [] _buf_payload = [] - _last_flush = time.time() + _last_flush = time.time() # ----- 并发控制 ----- - _lock = threading.Lock() - _existing_op_keys = set() + _lock = threading.Lock() + _existing_op_keys = set() _existing_vid_keys = set() # ----- queue / 后台线程模式 ----- _queue_mode = False - _queue = Queue() + _queue = Queue() # ================== 退回 Redis 模拟 ================== @staticmethod @@ -726,7 +735,7 @@ class DBSA: data["sort"] = data.get("index", 0) now_ts = int(time.time()) - op_index_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) + op_index_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) vid_index_key = (data["v_xid"] or "", data["v_name"] or "") # ---------- ① 获取互斥锁 ---------- @@ -748,8 +757,8 @@ class DBSA: keyword=data["keyword"], is_repeat=data["is_repeat"], sort=data["sort"], - createtime=now_ts, # 首次插入 - updatetime=now_ts, # 后续更新只改这一列 + createtime=now_ts, # 首次插入 + updatetime=now_ts, # 后续更新只改这一列 batch=data.get("batch", 0), machine=data.get("machine_id", 0), is_piracy=data.get("is_piracy", '3'), @@ -850,9 +859,9 @@ class DBSA: if not cls._lock.acquire(timeout=cls.LOCK_TIMEOUT): raise RuntimeError("flush 未取得 cls._lock,可能死锁") try: - op_rows = cls._buf_op[:] - vid_rows = cls._buf_vid[:] - payloads = cls._buf_payload[:] + op_rows = cls._buf_op[:] + vid_rows = cls._buf_vid[:] + payloads = cls._buf_payload[:] cls._clear_buffers() finally: cls._lock.release() @@ -894,9 +903,9 @@ class DBSA: if op_rows: stmt_op = mysql_insert(video_op).values(op_rows) ondup_op = stmt_op.on_duplicate_key_update( - updatetime = stmt_op.inserted.updatetime, + updatetime=stmt_op.inserted.updatetime, # ts_status = stmt_op.inserted.ts_status, - is_repeat = stmt_op.inserted.is_repeat, + is_repeat=stmt_op.inserted.is_repeat, ) cls._safe_execute(ondup_op, desc="video_op") logger.info("落表:操作记录 %d 条", len(op_rows)) @@ -905,23 +914,23 @@ class DBSA: if vid_rows: stmt_vid = mysql_insert(video).values(vid_rows) ondup_vid = stmt_vid.on_duplicate_key_update( - title = stmt_vid.inserted.title, - v_name = stmt_vid.inserted.v_name, - link = stmt_vid.inserted.link, - edition = stmt_vid.inserted.edition, - duration = stmt_vid.inserted.duration, - watch_number = stmt_vid.inserted.watch_number, - follow_number = stmt_vid.inserted.follow_number, - video_number = stmt_vid.inserted.video_number, - public_time = stmt_vid.inserted.public_time, - cover_pic = stmt_vid.inserted.cover_pic, - sort = stmt_vid.inserted.sort, - u_xid = stmt_vid.inserted.u_xid, - u_id = stmt_vid.inserted.u_id, - u_pic = stmt_vid.inserted.u_pic, - u_name = stmt_vid.inserted.u_name, - status = stmt_vid.inserted.status, - updatetime = stmt_vid.inserted.updatetime, + title=stmt_vid.inserted.title, + v_name=stmt_vid.inserted.v_name, + link=stmt_vid.inserted.link, + edition=stmt_vid.inserted.edition, + duration=stmt_vid.inserted.duration, + watch_number=stmt_vid.inserted.watch_number, + follow_number=stmt_vid.inserted.follow_number, + video_number=stmt_vid.inserted.video_number, + public_time=stmt_vid.inserted.public_time, + cover_pic=stmt_vid.inserted.cover_pic, + sort=stmt_vid.inserted.sort, + u_xid=stmt_vid.inserted.u_xid, + u_id=stmt_vid.inserted.u_id, + u_pic=stmt_vid.inserted.u_pic, + u_name=stmt_vid.inserted.u_name, + status=stmt_vid.inserted.status, + updatetime=stmt_vid.inserted.updatetime, ) cls._safe_execute(ondup_vid, desc="video") logger.info("落表:视频记录 %d 条", len(vid_rows)) @@ -985,7 +994,7 @@ class DBSA: data["sort"] = data.get("index", 0) now_ts = int(time.time()) - op_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) + op_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) vid_key = (data["v_xid"] or "", data["v_name"] or "") if op_key in cls._existing_op_keys or vid_key in cls._existing_vid_keys: return diff --git a/onoe.py b/onoe.py index e2db259..f2b4f85 100644 --- a/onoe.py +++ b/onoe.py @@ -34,19 +34,7 @@ UserAgent = [ 'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Maxthon/5.0.3.4000 Chrome/47.0.2526.73 Safari/537.36', 'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World)'] -sec_ch_ua_list = [ - '"Chromium";v="0", "Not;A=Brand";v="24", "Google Chrome";v="0"', - '"Chromium";v="0", "Not;A=Brand";v="24", "Google Chrome";v="0"', - '"Chromium";v="0", "Not;A=Brand";v="24", "Google Chrome";v="0"', - '"Chromium";v="0", "Not;A=Brand";v="24", "Google Chrome";v="0"', - '"Chromium";v="0", "Not;A=Brand";v="24", "Google Chrome";v="0"', - '"Chromium";v="136", "Not;A=Brand";v="24", "Google Chrome";v="136"', - '"Chromium";v="128", "Not;A=Brand";v="24", "Google Chrome";v="128"', - '"Chromium";v="53", "Not;A=Brand";v="24", "Google Chrome";v="53"', - '"Chromium";v="0", "Not;A=Brand";v="24", "Google Chrome";v="0"', - '"Chromium";v="47", "Not;A=Brand";v="24", "Google Chrome";v="47"', - '"Chromium";v="0", "Not;A=Brand";v="24", "Google Chrome";v="0"' -] + def get_part_ids(part_num: int, take: int, offset: int = 0): part_ids = list(range(offset, offset + take)) @@ -233,17 +221,14 @@ def gettoken(proxy, r=2): } try: proxy_str = db.get_proxy(proxy) - logger.info(f"[代理] => {proxy_str}") url = 'https://graphql.api.dailymotion.com/oauth/token' response = requests.post(url, headers=headers, data=data, proxies={"http": proxy_str, "https": proxy_str}) token = response.json()['access_token'] copy_headers = copy.deepcopy(headers1) - uaidx = random.randint(0, len(UserAgent) - 1) copy_headers['authorization'] = "Bearer " + token copy_headers['x-dm-visit-id'] = str(int(time.time() * 1000)) copy_headers['x-dm-visitor-id'] = uuid_with_dash - copy_headers['User-Agent'] = UserAgent[uaidx] - copy_headers['sec-ch-ua'] = sec_ch_ua_list[uaidx] + copy_headers['User-Agent'] = UserAgent[random.randint(0, len(UserAgent) - 1)] copy_headers['X-DM-Preferred-Country'] = proxy.lower() with _cache_lock: _headers_cache = copy_headers @@ -282,18 +267,18 @@ def solve_recaptcha_v3_with_proxy( payload = { "clientKey": "CAP-A76C932D4C6CCB3CA748F77FDC07D996", "task": { - "type": "ReCaptchaV3TaskProxyLess", + "type": "ReCaptchaV3Task", "websiteURL": f"https://www.dailymotion.com/search/{encoded_query}/top-results", "websiteKey": "6LeOJBIrAAAAAPMIjyYvo-eN_9W1HDOkrEqHR8tM", - "pageAction": "search", + "pageAction": "___grecaptcha_cfg.clients['100000']['L']['L']['promise-callback'](gRecaptchaResponse)", "minScore": 0.5 } } - resp = requests.post(create_url, data=json.dumps(payload), headers=headers, timeout=30) + resp = requests.post(create_url, json=payload, headers=headers, timeout=30) logger.info(f"[token] 发送 payload:{payload}") resp.raise_for_status() task_id = resp.json()["taskId"] - logger.info(f"task_id: {resp.text}") + logger.info(f"task_id: {task_id}") # 轮询获取结果 check_payload = {"clientKey": "CAP-A76C932D4C6CCB3CA748F77FDC07D996", "taskId": task_id} for i in range(max_poll_attempts): @@ -302,7 +287,7 @@ def solve_recaptcha_v3_with_proxy( result = r.json() logger.info(f"第{i}次,task_id:{task_id},结果:{result}") if result.get("status") == "ready": - return result["solution"]["gRecaptchaResponse"] + return result["solution"]["token"] time.sleep(polling_interval) raise TimeoutError(f"任务 {task_id} 在轮询 {max_poll_attempts} 次后未完成")