From 217d8c7ed7b010f000d8d205907f4d270bc2408a Mon Sep 17 00:00:00 2001 From: Franklin-F Date: Thu, 22 May 2025 21:46:11 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=20DB.py=20=E5=92=8C?= =?UTF-8?q?=20dump=5Fkeyword=5Ftitle.py=EF=BC=8C=E5=A2=9E=E5=BC=BA?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=E5=92=8C=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DB.py | 112 ++++++++++++++++++++++++++++-------------- dump_keyword_title.py | 35 ++++++++----- 2 files changed, 99 insertions(+), 48 deletions(-) diff --git a/DB.py b/DB.py index 0aaa5c9..c2f7738 100644 --- a/DB.py +++ b/DB.py @@ -26,43 +26,42 @@ _engine = create_engine( _meta = MetaData() video_op = Table("sh_dm_video_op_v2", _meta, - Column("v_id", BigInteger, primary_key=True), - Column("v_xid", String(64)), - Column("a_id", Integer), - Column("level", Integer), - Column("name_title", String(255)), - Column("keyword", String(255)), - Column("rn", String(8)), - Column("history_status", String(32)), - Column("is_repeat", Integer), - Column("sort", Integer), - Column("createtime", Integer), - Column("updatetime", Integer), - Column("batch", BigInteger), - Column("machine", Integer), -) + Column("v_id", BigInteger, primary_key=True), + Column("v_xid", String(64)), + Column("a_id", Integer), + Column("level", Integer), + Column("name_title", String(255)), + Column("keyword", String(255)), + Column("rn", String(8)), + Column("history_status", String(32)), + Column("is_repeat", Integer), + Column("sort", Integer), + Column("createtime", Integer), + Column("updatetime", Integer), + Column("batch", BigInteger), + Column("machine", Integer), + ) video = Table("sh_dm_video_v2", _meta, - Column("v_id", BigInteger, primary_key=True), - Column("v_xid", String(64)), - Column("rn", String(8)), - Column("v_name", String(255)), - Column("title", String(255)), - Column("link", Text), - Column("edition", String(64)), - Column("duration", Integer), - Column("public_time", String(32)), - Column("cover_pic", Text), - Column("sort", Integer), - Column("u_xid", String(64)), - Column("u_id", BigInteger), - Column("u_pic", Text), - Column("u_name", String(255)), - Column("status", Integer), - Column("createtime", Integer), - Column("updatetime", Integer), -) - + Column("v_id", BigInteger, primary_key=True), + Column("v_xid", String(64)), + Column("rn", String(8)), + Column("v_name", String(255)), + Column("title", String(255)), + Column("link", Text), + Column("edition", String(64)), + Column("duration", Integer), + Column("public_time", String(32)), + Column("cover_pic", Text), + Column("sort", Integer), + Column("u_xid", String(64)), + Column("u_id", BigInteger), + Column("u_pic", Text), + Column("u_name", String(255)), + Column("status", Integer), + Column("createtime", Integer), + Column("updatetime", Integer), + ) def mysql_retry(max_retries: int = 3, base_delay: float = 2.0): @@ -370,6 +369,35 @@ class DBVidcon: self.cursor.execute(sql, (level,)) return self.cursor.fetchall() + @mysql_retry() + def log_batch_start(self, info: Dict) -> int or None: + batch = info.get("batch") + level = info.get("level") + if batch is None or level is None: + raise ValueError("info 字典必须包含 'batch' 和 'level'") + count = info.get("count", 0) + if level == 0: + t0, t1, t2 = count, 0, 0 + elif level == 1: + t0, t1, t2 = 0, count, 0 + elif level == 9: + level = 2 + t0, t1, t2 = 0, 0, count + + start_ts = int(time.time()) + sql = """ + INSERT INTO sh_dm_batch_log + (batch, info, t0, t1, t2, starttime) + VALUES (%s, %s, %s, %s, %s, %s) + """ + try: + self.cursor.execute(sql, (batch, level, t0, t1, t2, start_ts)) + self.conn.commit() + return self.cursor.lastrowid + except Exception as e: + print(f"[log_batch_start] 插入失败:{e}") + return None + @mysql_retry() def flush(self): """批量执行完后手动提交。""" @@ -411,16 +439,26 @@ class DBVidcon: @redis_retry(max_retries=3) def queues_empty(self) -> bool: """ - 判断 urgent_list_key 和 list_key 两个队列是否都为空。 如果都空,返回 True;只要有一个不空,就返回 False。 """ - # 注意:redis.llen 返回 int return ( self.redis.llen(self.l0_list_key) == 0 and self.redis.llen(self.l1_list_key) == 0 and self.redis.llen(self.l2_list_key) == 0 ) + @redis_retry() + def l0_empty(self) -> bool: + return self.redis.llen(self.l0_list_key) == 0 + + @redis_retry() + def l1_empty(self) -> bool: + return self.redis.llen(self.l1_list_key) == 0 + + @redis_retry() + def l2_empty(self) -> bool: + return self.redis.llen(self.l2_list_key) == 0 + @redis_retry(max_retries=3) def pop_error_item(self): """ diff --git a/dump_keyword_title.py b/dump_keyword_title.py index 014b7c8..dcdbf3f 100644 --- a/dump_keyword_title.py +++ b/dump_keyword_title.py @@ -14,28 +14,41 @@ def main(): args = parse_args() batch = int(time.time()) db = DBVidcon() + push = None + empty = None - rows = db.fetch_keyword_title(level=args.level) - payload_list = [] - push = None if args.level == 0: push = db.push_l0 + empty = db.l0_empty elif args.level == 1: push = db.push_l1 + empty = db.l1_empty elif args.level == 9: push = db.push_l2 + empty = db.l2_empty else: return - for row in rows: - payload_list.append(json.dumps({**row, "batch": batch}, ensure_ascii=False)) - if len(payload_list) >= 10000: - push(payload_list) - payload_list.clear() - if payload_list: # 收尾 - push(payload_list) + if empty(): + rows = db.fetch_keyword_title(level=args.level) + payload_list = [] - print(f"✔ 推送 {len(rows)} 行(batch={batch})到 {push.__name__}队列完毕") + + for row in rows: + payload_list.append(json.dumps({**row, "batch": batch}, ensure_ascii=False)) + if len(payload_list) >= 10000: + push(payload_list) + payload_list.clear() + if payload_list: # 收尾 + push(payload_list) + + data = { + "level": args.level, + "batch": batch, + "count": len(rows), + } + db.log_batch_start(data) + print(f"✔ 推送 {len(rows)} 行(batch={batch})到 {push.__name__}队列完毕") db.close() if __name__ == "__main__":