feat: 优化 DB.py 和 dump_keyword_title.py,增强数据处理和日志记录功能

This commit is contained in:
晓丰 2025-05-22 21:46:11 +08:00
parent 9b74bdf312
commit 217d8c7ed7
2 changed files with 99 additions and 48 deletions

112
DB.py
View File

@ -26,43 +26,42 @@ _engine = create_engine(
_meta = MetaData()
video_op = Table("sh_dm_video_op_v2", _meta,
Column("v_id", BigInteger, primary_key=True),
Column("v_xid", String(64)),
Column("a_id", Integer),
Column("level", Integer),
Column("name_title", String(255)),
Column("keyword", String(255)),
Column("rn", String(8)),
Column("history_status", String(32)),
Column("is_repeat", Integer),
Column("sort", Integer),
Column("createtime", Integer),
Column("updatetime", Integer),
Column("batch", BigInteger),
Column("machine", Integer),
)
Column("v_id", BigInteger, primary_key=True),
Column("v_xid", String(64)),
Column("a_id", Integer),
Column("level", Integer),
Column("name_title", String(255)),
Column("keyword", String(255)),
Column("rn", String(8)),
Column("history_status", String(32)),
Column("is_repeat", Integer),
Column("sort", Integer),
Column("createtime", Integer),
Column("updatetime", Integer),
Column("batch", BigInteger),
Column("machine", Integer),
)
video = Table("sh_dm_video_v2", _meta,
Column("v_id", BigInteger, primary_key=True),
Column("v_xid", String(64)),
Column("rn", String(8)),
Column("v_name", String(255)),
Column("title", String(255)),
Column("link", Text),
Column("edition", String(64)),
Column("duration", Integer),
Column("public_time", String(32)),
Column("cover_pic", Text),
Column("sort", Integer),
Column("u_xid", String(64)),
Column("u_id", BigInteger),
Column("u_pic", Text),
Column("u_name", String(255)),
Column("status", Integer),
Column("createtime", Integer),
Column("updatetime", Integer),
)
Column("v_id", BigInteger, primary_key=True),
Column("v_xid", String(64)),
Column("rn", String(8)),
Column("v_name", String(255)),
Column("title", String(255)),
Column("link", Text),
Column("edition", String(64)),
Column("duration", Integer),
Column("public_time", String(32)),
Column("cover_pic", Text),
Column("sort", Integer),
Column("u_xid", String(64)),
Column("u_id", BigInteger),
Column("u_pic", Text),
Column("u_name", String(255)),
Column("status", Integer),
Column("createtime", Integer),
Column("updatetime", Integer),
)
def mysql_retry(max_retries: int = 3, base_delay: float = 2.0):
@ -370,6 +369,35 @@ class DBVidcon:
self.cursor.execute(sql, (level,))
return self.cursor.fetchall()
@mysql_retry()
def log_batch_start(self, info: Dict) -> int or None:
batch = info.get("batch")
level = info.get("level")
if batch is None or level is None:
raise ValueError("info 字典必须包含 'batch''level'")
count = info.get("count", 0)
if level == 0:
t0, t1, t2 = count, 0, 0
elif level == 1:
t0, t1, t2 = 0, count, 0
elif level == 9:
level = 2
t0, t1, t2 = 0, 0, count
start_ts = int(time.time())
sql = """
INSERT INTO sh_dm_batch_log
(batch, info, t0, t1, t2, starttime)
VALUES (%s, %s, %s, %s, %s, %s)
"""
try:
self.cursor.execute(sql, (batch, level, t0, t1, t2, start_ts))
self.conn.commit()
return self.cursor.lastrowid
except Exception as e:
print(f"[log_batch_start] 插入失败:{e}")
return None
@mysql_retry()
def flush(self):
"""批量执行完后手动提交。"""
@ -411,16 +439,26 @@ class DBVidcon:
@redis_retry(max_retries=3)
def queues_empty(self) -> bool:
"""
判断 urgent_list_key list_key 两个队列是否都为空
如果都空返回 True只要有一个不空就返回 False
"""
# 注意redis.llen 返回 int
return (
self.redis.llen(self.l0_list_key) == 0
and self.redis.llen(self.l1_list_key) == 0
and self.redis.llen(self.l2_list_key) == 0
)
@redis_retry()
def l0_empty(self) -> bool:
return self.redis.llen(self.l0_list_key) == 0
@redis_retry()
def l1_empty(self) -> bool:
return self.redis.llen(self.l1_list_key) == 0
@redis_retry()
def l2_empty(self) -> bool:
return self.redis.llen(self.l2_list_key) == 0
@redis_retry(max_retries=3)
def pop_error_item(self):
"""

View File

@ -14,28 +14,41 @@ def main():
args = parse_args()
batch = int(time.time())
db = DBVidcon()
push = None
empty = None
rows = db.fetch_keyword_title(level=args.level)
payload_list = []
push = None
if args.level == 0:
push = db.push_l0
empty = db.l0_empty
elif args.level == 1:
push = db.push_l1
empty = db.l1_empty
elif args.level == 9:
push = db.push_l2
empty = db.l2_empty
else:
return
for row in rows:
payload_list.append(json.dumps({**row, "batch": batch}, ensure_ascii=False))
if len(payload_list) >= 10000:
push(payload_list)
payload_list.clear()
if payload_list: # 收尾
push(payload_list)
if empty():
rows = db.fetch_keyword_title(level=args.level)
payload_list = []
print(f"✔ 推送 {len(rows)}batch={batch})到 {push.__name__}队列完毕")
for row in rows:
payload_list.append(json.dumps({**row, "batch": batch}, ensure_ascii=False))
if len(payload_list) >= 10000:
push(payload_list)
payload_list.clear()
if payload_list: # 收尾
push(payload_list)
data = {
"level": args.level,
"batch": batch,
"count": len(rows),
}
db.log_batch_start(data)
print(f"✔ 推送 {len(rows)}batch={batch})到 {push.__name__}队列完毕")
db.close()
if __name__ == "__main__":