diff --git a/DB.py b/DB.py index 84e6b48..1e34ed4 100644 --- a/DB.py +++ b/DB.py @@ -29,51 +29,51 @@ _engine = create_engine( _meta = MetaData() # 操作记录表 video_op = Table("sh_dm_video_op_v3", _meta, - Column("id", Integer, primary_key=True, autoincrement=True), - Column("v_id", String(64)), - Column("v_xid", String(64)), - Column("a_id", Integer, default=0), - Column("level", Integer), - Column("name_title", String(100)), - Column("keyword", String(100)), - Column("rn", String(50)), - Column("history_status", String(100)), - Column("is_repeat", Integer), - Column("is_piracy", String(2), default='3'), - Column("sort", Integer), - Column("createtime", Integer), - Column("updatetime", Integer), - Column("operatetime", Integer), - Column("batch", Integer), - Column("machine", Integer), - Column("ts_status", Integer, default=1), - ) + Column("id", Integer, primary_key=True, autoincrement=True), + Column("v_id", String(64)), + Column("v_xid", String(64)), + Column("a_id", Integer, default=0), + Column("level", Integer), + Column("name_title", String(100)), + Column("keyword", String(100)), + Column("rn", String(50)), + Column("history_status", String(100)), + Column("is_repeat", Integer), + Column("is_piracy", String(2), default='3'), + Column("sort", Integer), + Column("createtime", Integer), + Column("updatetime", Integer), + Column("operatetime", Integer), + Column("batch", Integer), + Column("machine", Integer), + Column("ts_status", Integer, default=1), +) # 视频表 video = Table("sh_dm_video_v3", _meta, - Column("id", Integer, primary_key=True, autoincrement=True), - Column("v_id", String(64)), - Column("v_name", String(100), nullable=False), - Column("v_xid", String(64), nullable=False), - Column("title", String(255), nullable=False), - Column("link", String(255), nullable=False), - Column("edition", String(255), default=''), - Column("duration", String(11), default='0'), - Column("watch_number", Integer, default=0), - Column("follow_number", Integer, default=0), - Column("video_number", Integer, default=0), - Column("public_time", DateTime), - Column("cover_pic", String(255)), - Column("sort", Integer), - Column("u_xid", String(64)), - Column("u_id", String(100)), - Column("u_pic", String(255)), - Column("u_name", String(255)), - Column("status", Integer, default=1), - Column("createtime", Integer, default=0), - Column("updatetime", Integer, default=0), - Column("operatetime", Integer, default=0), - ) + Column("id", Integer, primary_key=True, autoincrement=True), + Column("v_id", String(64)), + Column("v_name", String(100), nullable=False), + Column("v_xid", String(64), nullable=False), + Column("title", String(255), nullable=False), + Column("link", String(255), nullable=False), + Column("edition", String(255), default=''), + Column("duration", String(11), default='0'), + Column("watch_number", Integer, default=0), + Column("follow_number", Integer, default=0), + Column("video_number", Integer, default=0), + Column("public_time", DateTime), + Column("cover_pic", String(255)), + Column("sort", Integer), + Column("u_xid", String(64)), + Column("u_id", String(100)), + Column("u_pic", String(255)), + Column("u_name", String(255)), + Column("status", Integer, default=1), + Column("createtime", Integer, default=0), + Column("updatetime", Integer, default=0), + Column("operatetime", Integer, default=0), +) # 作者表 video_author = Table( "sh_dm_video_author", @@ -116,8 +116,7 @@ def mysql_retry(max_retries: int = 3, base_delay: float = 1.0): }.get(errno, f"MySQL错误{errno}") wait = base_delay * (2 ** (attempt - 1)) - logger.warning( - f"[MySQL][{fn.__name__}] 第{attempt}次重试({errno} {reason}):{e},等待 {wait:.1f}s...") + logger.warning(f"[MySQL][{fn.__name__}] 第{attempt}次重试({errno} {reason}):{e},等待 {wait:.1f}s...") # 仅对断连类错误尝试重连 if errno in {2013, 2006}: @@ -129,10 +128,8 @@ def mysql_retry(max_retries: int = 3, base_delay: float = 1.0): raise return wrapper - return decorator - def redis_retry(max_retries: int = 3): """ 装饰器工厂:指定最大重试次数。 @@ -178,7 +175,6 @@ class DBVidcon: self.l0_list_key = "video_l0_queue" self.l1_list_key = "video_l1_queue" self.l2_list_key = "video_l2_queue" - self.web_list_key = "video_web_queue" self.report_list = "report_queue" self.error_list_key = "error_save_queue" self.conn = pymysql.connect(**self._MYSQL_CONF) @@ -261,13 +257,6 @@ class DBVidcon: self.redis.lpush(self.l2_list_key, *raws) logger.info(f"[写入l2] 已推入 {len(raws)} 条") - @redis_retry(max_retries=3) - def push_web(self, raws): - if isinstance(raws, str): - raws = [raws] - self.redis.lpush(self.web_list_key, *raws) - logger.info(f"[写入web] 已推入 {len(raws)} 条") - @redis_retry(max_retries=3) def push_report(self, raws): """原子操作:清空列表并写入新数据""" @@ -329,17 +318,6 @@ class DBVidcon: items = [] return items - @redis_retry(max_retries=3) - def get_web_items(self, count: int = 1): - try: - items = self.fetch_from_redis(count, list_key=self.web_list_key) - except Exception as e: - logger.info("[Redis web pop error]", e) - self.reconnect_redis() - items = [] - return items - - @redis_retry(max_retries=3) def item_keyword(self, count: int = 20): try: @@ -395,48 +373,53 @@ class DBVidcon: @mysql_retry() def get_report_video(self): sql = """ - SELECT id, - name_title, - link - FROM sh_dm_fight_records - WHERE status = 1 \ - """ + SELECT + id, + name_title, + link + FROM + sh_dm_fight_records + WHERE + status = 1 + """ self.cursor.execute(sql) return self.cursor.fetchall() @mysql_retry() - def get_subsequent_report_video(self, did: int): + def get_subsequent_report_video(self,did: int): sql = """ - SELECT DISTINCT report_id - FROM sh_dm_fight_records - WHERE status = 2 - AND subsequent_status = 1 - AND report_time != '' - AND mid = %s \ - """ - self.cursor.execute(sql, did) + SELECT DISTINCT report_id + FROM sh_dm_fight_records + WHERE + status = 2 + AND subsequent_status in (1, 3) + AND report_time != '' + AND mid = %s + """ + self.cursor.execute(sql,did) return self.cursor.fetchall() @mysql_retry() def getreport_video(self): sql = """ - SELECT id, - v_xid - FROM sh_dm_fight_records - WHERE is_removed = '' - or is_removed IS NULL - or is_removed = 0 \ - """ + SELECT + id, + v_xid + FROM + sh_dm_fight_records + WHERE + is_removed = '' or is_removed IS NULL or is_removed = 0 + """ self.cursor.execute(sql) return self.cursor.fetchall() @mysql_retry() def mark_video_removed(self, d_id: int, removed_flag: int = 1): sql = """ - UPDATE sh_dm_fight_records - SET is_removed = %s - WHERE id = %s \ - """ + UPDATE sh_dm_fight_records + SET is_removed = %s + WHERE id = %s + """ self.cursor.execute(sql, (removed_flag, d_id)) self.flush() @@ -481,12 +464,11 @@ class DBVidcon: @mysql_retry() def update_video_ts_status(self): sql = """ - UPDATE sh_dm_video_v2 v - JOIN sh_dm_video_author a - ON v.u_xid = a.u_xid - SET v.ts_status = 3 - WHERE a.white_status = 1; \ - """ + UPDATE sh_dm_video_v2 v + JOIN sh_dm_video_author a ON v.u_xid = a.u_xid + SET v.ts_status = 3 + WHERE a.white_status = 1; + """ self.cursor.execute(sql) self.flush() logger.info("[更新视频举报状态] 已执行完毕") @@ -506,32 +488,38 @@ class DBVidcon: while True: try: sql_op = """ - INSERT INTO sh_dm_video_op_v2 (v_id, v_xid, a_id, level, name_title, - keyword, rn, history_status, is_repeat, - sort, createtime, updatetime, batch, machine) - VALUES (%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, - %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, - %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s) \ - """ + INSERT INTO sh_dm_video_op_v2 ( + v_id, v_xid, a_id, level, name_title, + keyword, rn, history_status, is_repeat, + sort, createtime, updatetime, batch, machine + ) VALUES ( + %(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, + %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, + %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s + ) + """ self.cursor.execute(sql_op, data) sql_update = """ - INSERT INTO sh_dm_video_v2 (v_id, v_xid, rn, v_name, title, link, - edition, duration, - public_time, cover_pic, sort, - u_xid, u_id, u_pic, u_name, - status, createtime, updatetime) - VALUES (%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, - '', %(duration)s, - %(create_time)s, %(cover_pic)s, %(sort)s, - %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, - 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP()) ON DUPLICATE KEY - UPDATE - title = - VALUES (title), duration = - VALUES (duration), cover_pic = - VALUES (cover_pic), sort = - VALUES (sort), updatetime = UNIX_TIMESTAMP(); \ - """ + INSERT INTO sh_dm_video_v2 ( + v_id, v_xid, rn, v_name, title, link, + edition, duration, + public_time, cover_pic, sort, + u_xid, u_id, u_pic, u_name, + status, createtime, updatetime + ) VALUES ( + %(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, + '', %(duration)s, + %(create_time)s, %(cover_pic)s, %(sort)s, + %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, + 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP() + ) + ON DUPLICATE KEY UPDATE + title = VALUES(title), + duration = VALUES(duration), + cover_pic = VALUES(cover_pic), + sort = VALUES(sort), + updatetime = UNIX_TIMESTAMP(); + """ self.cursor.execute(sql_update, data) break # 成功跳出重试循环 @@ -557,26 +545,33 @@ class DBVidcon: @mysql_retry() def fetch_keyword_title(self, level: int = 99): sql = """ - SELECT k.keyword, - k.rn, - t.title AS v_name, - ANY_VALUE(t.level) AS level - FROM sh_dm_keyword k - LEFT JOIN - sh_dm_title t ON k.title = t.title - WHERE k.status = 1 - AND t.status = 1 - AND NOT EXISTS (SELECT 1 - FROM sh_dm_black_keyword b - WHERE ( - (b.keyword IS NULL OR b.keyword = '') AND b.title = t.title - ) - OR ( - b.keyword IS NOT NULL AND b.keyword != '' AND b.keyword = k.keyword - )) - AND t.level = %s - GROUP BY k.keyword, k.rn; \ - """ + SELECT + k.keyword, + k.rn, + t.title AS v_name, + ANY_VALUE(t.level) AS level + FROM + sh_dm_keyword k + LEFT JOIN + sh_dm_title t ON k.title = t.title + WHERE + k.status = 1 + AND t.status = 1 + AND NOT EXISTS ( + SELECT 1 + FROM sh_dm_black_keyword b + WHERE + ( + (b.keyword IS NULL OR b.keyword = '') AND b.title = t.title + ) + OR ( + b.keyword IS NOT NULL AND b.keyword != '' AND b.keyword = k.keyword + ) + ) + AND t.level = %s + GROUP BY + k.keyword, k.rn; + """ self.cursor.execute(sql, (level,)) return self.cursor.fetchall() @@ -597,10 +592,10 @@ class DBVidcon: start_ts = int(time.time()) sql = """ - INSERT INTO sh_dm_batch_log - (batch, info, t0, t1, t2, starttime) - VALUES (%s, %s, %s, %s, %s, %s) \ - """ + INSERT INTO sh_dm_batch_log + (batch, info, t0, t1, t2, starttime) + VALUES (%s, %s, %s, %s, %s, %s) + """ try: self.cursor.execute(sql, (batch, level, t0, t1, t2, start_ts)) self.conn.commit() @@ -671,10 +666,6 @@ class DBVidcon: def l2_empty(self) -> bool: return self.redis.llen(self.l2_list_key) == 0 - @redis_retry(max_retries=3) - def web_empty(self) -> bool: - return self.redis.llen(self.web_list_key) == 0 - @redis_retry(max_retries=3) def pop_error_item(self): """ @@ -688,30 +679,30 @@ class DBVidcon: class DBSA: # ======= 可调参数 ======= - FLUSH_EVERY_ROWS = 100 # 行数阈值 - FLUSH_INTERVAL = 30 # 秒阈值 - MAX_SQL_RETRY = 3 # SQL 死锁自旋 - SQL_RETRY_BASE_SLEEP = 0.5 # 自旋退避基数 - FLUSH_RETRY = 3 # flush 整体轮次 - DELAY_ON_FAIL = 10 # flush 失败等待 - DEADLOCK_ERRNO = 1213 # MySQL 死锁码 - LOCK_TIMEOUT = 3 # 互斥锁超时 + FLUSH_EVERY_ROWS = 100 # 行数阈值 + FLUSH_INTERVAL = 30 # 秒阈值 + MAX_SQL_RETRY = 3 # SQL 死锁自旋 + SQL_RETRY_BASE_SLEEP = 0.5 # 自旋退避基数 + FLUSH_RETRY = 3 # flush 整体轮次 + DELAY_ON_FAIL = 10 # flush 失败等待 + DEADLOCK_ERRNO = 1213 # MySQL 死锁码 + LOCK_TIMEOUT = 3 # 互斥锁超时 # ======================== # ----- 缓冲区 ----- - _buf_op = [] - _buf_vid = [] + _buf_op = [] + _buf_vid = [] _buf_payload = [] - _last_flush = time.time() + _last_flush = time.time() # ----- 并发控制 ----- - _lock = threading.Lock() - _existing_op_keys = set() + _lock = threading.Lock() + _existing_op_keys = set() _existing_vid_keys = set() # ----- queue / 后台线程模式 ----- _queue_mode = False - _queue = Queue() + _queue = Queue() # ================== 退回 Redis 模拟 ================== @staticmethod @@ -735,7 +726,7 @@ class DBSA: data["sort"] = data.get("index", 0) now_ts = int(time.time()) - op_index_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) + op_index_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) vid_index_key = (data["v_xid"] or "", data["v_name"] or "") # ---------- ① 获取互斥锁 ---------- @@ -757,8 +748,8 @@ class DBSA: keyword=data["keyword"], is_repeat=data["is_repeat"], sort=data["sort"], - createtime=now_ts, # 首次插入 - updatetime=now_ts, # 后续更新只改这一列 + createtime=now_ts, # 首次插入 + updatetime=now_ts, # 后续更新只改这一列 batch=data.get("batch", 0), machine=data.get("machine_id", 0), is_piracy=data.get("is_piracy", '3'), @@ -859,9 +850,9 @@ class DBSA: if not cls._lock.acquire(timeout=cls.LOCK_TIMEOUT): raise RuntimeError("flush 未取得 cls._lock,可能死锁") try: - op_rows = cls._buf_op[:] - vid_rows = cls._buf_vid[:] - payloads = cls._buf_payload[:] + op_rows = cls._buf_op[:] + vid_rows = cls._buf_vid[:] + payloads = cls._buf_payload[:] cls._clear_buffers() finally: cls._lock.release() @@ -903,9 +894,9 @@ class DBSA: if op_rows: stmt_op = mysql_insert(video_op).values(op_rows) ondup_op = stmt_op.on_duplicate_key_update( - updatetime=stmt_op.inserted.updatetime, + updatetime = stmt_op.inserted.updatetime, # ts_status = stmt_op.inserted.ts_status, - is_repeat=stmt_op.inserted.is_repeat, + is_repeat = stmt_op.inserted.is_repeat, ) cls._safe_execute(ondup_op, desc="video_op") logger.info("落表:操作记录 %d 条", len(op_rows)) @@ -914,23 +905,23 @@ class DBSA: if vid_rows: stmt_vid = mysql_insert(video).values(vid_rows) ondup_vid = stmt_vid.on_duplicate_key_update( - title=stmt_vid.inserted.title, - v_name=stmt_vid.inserted.v_name, - link=stmt_vid.inserted.link, - edition=stmt_vid.inserted.edition, - duration=stmt_vid.inserted.duration, - watch_number=stmt_vid.inserted.watch_number, - follow_number=stmt_vid.inserted.follow_number, - video_number=stmt_vid.inserted.video_number, - public_time=stmt_vid.inserted.public_time, - cover_pic=stmt_vid.inserted.cover_pic, - sort=stmt_vid.inserted.sort, - u_xid=stmt_vid.inserted.u_xid, - u_id=stmt_vid.inserted.u_id, - u_pic=stmt_vid.inserted.u_pic, - u_name=stmt_vid.inserted.u_name, - status=stmt_vid.inserted.status, - updatetime=stmt_vid.inserted.updatetime, + title = stmt_vid.inserted.title, + v_name = stmt_vid.inserted.v_name, + link = stmt_vid.inserted.link, + edition = stmt_vid.inserted.edition, + duration = stmt_vid.inserted.duration, + watch_number = stmt_vid.inserted.watch_number, + follow_number = stmt_vid.inserted.follow_number, + video_number = stmt_vid.inserted.video_number, + public_time = stmt_vid.inserted.public_time, + cover_pic = stmt_vid.inserted.cover_pic, + sort = stmt_vid.inserted.sort, + u_xid = stmt_vid.inserted.u_xid, + u_id = stmt_vid.inserted.u_id, + u_pic = stmt_vid.inserted.u_pic, + u_name = stmt_vid.inserted.u_name, + status = stmt_vid.inserted.status, + updatetime = stmt_vid.inserted.updatetime, ) cls._safe_execute(ondup_vid, desc="video") logger.info("落表:视频记录 %d 条", len(vid_rows)) @@ -994,7 +985,7 @@ class DBSA: data["sort"] = data.get("index", 0) now_ts = int(time.time()) - op_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) + op_key = (data["v_xid"] or "", data["keyword"] or "", now_ts) vid_key = (data["v_xid"] or "", data["v_name"] or "") if op_key in cls._existing_op_keys or vid_key in cls._existing_vid_keys: return diff --git a/main.py b/main.py index 8a9eae2..f2fc056 100644 --- a/main.py +++ b/main.py @@ -59,11 +59,11 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2): if r == 2: logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}") video_list = [] - max_page = 2 - limit = 30 + max_page = 3 + limit = 100 endpoint = 'https://api.dailymotion.com/videos' if level == 0 or level == 1: - max_page = 3 + max_page = 4 limit = 100 for j in range(1, max_page): params = {