feat: 添加报告推送功能,向report_queue写入数据并优化数据处理逻辑

This commit is contained in:
晓丰 2025-06-18 17:32:06 +08:00
parent 712a2cbdae
commit fbbc1ec1bc
2 changed files with 38 additions and 1 deletions

20
DB.py
View File

@ -160,6 +160,7 @@ class DBVidcon:
self.l0_list_key = "video_l0_queue" self.l0_list_key = "video_l0_queue"
self.l1_list_key = "video_l1_queue" self.l1_list_key = "video_l1_queue"
self.l2_list_key = "video_l2_queue" self.l2_list_key = "video_l2_queue"
self.report_list = "report_queue"
self.error_list_key = "error_save_queue" self.error_list_key = "error_save_queue"
self.conn = pymysql.connect(**self._MYSQL_CONF) self.conn = pymysql.connect(**self._MYSQL_CONF)
self.cursor = self.conn.cursor() self.cursor = self.conn.cursor()
@ -234,6 +235,14 @@ class DBVidcon:
self.redis.lpush(self.l2_list_key, *raws) self.redis.lpush(self.l2_list_key, *raws)
logger.info(f"[写入l2] 已推入 {len(raws)}") logger.info(f"[写入l2] 已推入 {len(raws)}")
@redis_retry(max_retries=3)
def push_report(self, raws):
"""向 report_queue 写入数据"""
if isinstance(raws, str):
raws = [raws]
self.redis.rpush(self.report_list, *raws)
logger.info(f"[写入report] 已推入 {len(raws)}")
@redis_retry(max_retries=3) @redis_retry(max_retries=3)
def get_proxy_agent_dict(self) -> dict: def get_proxy_agent_dict(self) -> dict:
try: try:
@ -260,6 +269,16 @@ class DBVidcon:
logger.info(result) logger.info(result)
return result['parameter'] if result else None return result['parameter'] if result else None
@redis_retry(max_retries=3)
def item_report(self, count: int = 20):
try:
items = self.fetch_from_redis(count, list_key=self.l0_list_key)
except Exception as e:
logger.info("[Redis l0 pop error]", e)
self.reconnect_redis()
items = []
return items
@redis_retry(max_retries=3) @redis_retry(max_retries=3)
def item_keyword(self, count: int = 20): def item_keyword(self, count: int = 20):
try: try:
@ -323,7 +342,6 @@ class DBVidcon:
sh_dm_fight_records sh_dm_fight_records
WHERE WHERE
status = 1 status = 1
LIMIT 1
""" """
self.cursor.execute(sql) self.cursor.execute(sql)
return self.cursor.fetchall() return self.cursor.fetchall()

19
push_report_to_redis.py Normal file
View File

@ -0,0 +1,19 @@
import json
from DB import DBVidcon
payload_list = []
db = DBVidcon()
rows = db.get_report_video()
push = db.push_report
# =======================
for row in rows:
payload_list.append(json.dumps({**row}, ensure_ascii=False))
if len(payload_list) >= 10000:
push(payload_list)
payload_list.clear()
if payload_list: # 收尾
push(payload_list)
db.close()