feat: 重构 dump_keyword_title.py,优化数据库查询和数据推送逻辑

This commit is contained in:
晓丰 2025-05-20 22:07:09 +08:00
parent 49c2e1d43c
commit 4f3051b100

View File

@ -1,48 +1,6 @@
#!/usr/bin/env python3 import json, time
"""
用法示例
--------
# 用默认 level=1
python3 dump_keyword_title.py
# 指定 level=3
python3 dump_keyword_title.py -l 3
"""
import json, time, pymysql, redis
import argparse import argparse
from DB import DBVidcon
# ======= 配置区 =======
MYSQL_CONFIG = {
"host": "192.144.230.75", "port": 3306,
"user": "db_vidcon", "password": "rexdK4fhCCiRE4BZ",
"database": "db_vidcon", "charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor
}
REDIS_CONFIG = {
"host": "192.144.230.75", "port": 6379,
"password": "qwert@$123!&", "decode_responses": True
}
LIST_KEY = "video_kw_queue"
BATCH_SIZE = 1000
SQL = """
SELECT
k.keyword,
k.rn,
t.title AS v_name,
ANY_VALUE(t.level) AS level
FROM
sh_dm_keyword k
LEFT JOIN
sh_dm_title t ON k.title = t.title
WHERE
k.status = 1
AND t.status = 1
AND NOT EXISTS (
SELECT 1 FROM sh_dm_black_keyword b WHERE b.title = t.title
)
AND t.level = %s
GROUP BY k.keyword, k.rn
"""
def parse_args(): def parse_args():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
@ -50,35 +8,35 @@ def parse_args():
) )
parser.add_argument("-l", "--level", type=int, default=99, parser.add_argument("-l", "--level", type=int, default=99,
help="value for t.level (default: 99)") help="value for t.level (default: 99)")
parser.add_argument("-u", "--urgent", type=int, default=0,
help="加急标记")
return parser.parse_args() return parser.parse_args()
def main(): def main():
args = parse_args() args = parse_args()
batch_ts = int(time.time()) batch = int(time.time())
conn = pymysql.connect(**MYSQL_CONFIG) db = DBVidcon()
cur = conn.cursor()
cur.execute(SQL, (args.level,))
r = redis.Redis(**REDIS_CONFIG)
pipe = r.pipeline()
total = 0
start = time.time()
global LIST_KEY
if args.urgent == 1:
LIST_KEY = "video_urgent_queue"
for row in cur: rows = db.fetch_keyword_title(level=args.level)
row["batch"] = batch_ts payload_list = []
pipe.lpush(LIST_KEY, json.dumps(row, ensure_ascii=False)) push = None
total += 1 if args.level == 0:
if total % BATCH_SIZE == 0: push = db.push_l0
pipe.execute() elif args.level == 1:
push = db.push_l1
elif args.level == 2:
push = db.push_l2
else:
return
if pipe.command_stack: for row in rows:
pipe.execute() payload_list.append(json.dumps({**row, "batch": batch}, ensure_ascii=False))
if len(payload_list) >= 10000:
push(payload_list)
payload_list.clear()
if payload_list: # 收尾
push(payload_list)
print(f"✔ 推送 {total}level={args.level}, batch={batch_ts})→ Redis '{LIST_KEY}',耗时 {time.time()-start:.2f}s") print(f"✔ 推送 {len(rows)}batch={batch})到 {push.__name__}队列完毕")
db.close()
if __name__ == "__main__": if __name__ == "__main__":
main() main()