From fbbc1ec1bc1c128312787cc3dce5c5e412292374 Mon Sep 17 00:00:00 2001 From: Franklin-F Date: Wed, 18 Jun 2025 17:32:06 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E6=8A=A5=E5=91=8A?= =?UTF-8?q?=E6=8E=A8=E9=80=81=E5=8A=9F=E8=83=BD=EF=BC=8C=E5=90=91report=5F?= =?UTF-8?q?queue=E5=86=99=E5=85=A5=E6=95=B0=E6=8D=AE=E5=B9=B6=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DB.py | 20 +++++++++++++++++++- push_report_to_redis.py | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) create mode 100644 push_report_to_redis.py diff --git a/DB.py b/DB.py index 6900098..1375a93 100644 --- a/DB.py +++ b/DB.py @@ -160,6 +160,7 @@ class DBVidcon: self.l0_list_key = "video_l0_queue" self.l1_list_key = "video_l1_queue" self.l2_list_key = "video_l2_queue" + self.report_list = "report_queue" self.error_list_key = "error_save_queue" self.conn = pymysql.connect(**self._MYSQL_CONF) self.cursor = self.conn.cursor() @@ -234,6 +235,14 @@ class DBVidcon: self.redis.lpush(self.l2_list_key, *raws) logger.info(f"[写入l2] 已推入 {len(raws)} 条") + @redis_retry(max_retries=3) + def push_report(self, raws): + """向 report_queue 写入数据""" + if isinstance(raws, str): + raws = [raws] + self.redis.rpush(self.report_list, *raws) + logger.info(f"[写入report] 已推入 {len(raws)} 条") + @redis_retry(max_retries=3) def get_proxy_agent_dict(self) -> dict: try: @@ -260,6 +269,16 @@ class DBVidcon: logger.info(result) return result['parameter'] if result else None + @redis_retry(max_retries=3) + def item_report(self, count: int = 20): + try: + items = self.fetch_from_redis(count, list_key=self.l0_list_key) + except Exception as e: + logger.info("[Redis l0 pop error]", e) + self.reconnect_redis() + items = [] + return items + @redis_retry(max_retries=3) def item_keyword(self, count: int = 20): try: @@ -323,7 +342,6 @@ class DBVidcon: sh_dm_fight_records WHERE status = 1 - LIMIT 1 """ self.cursor.execute(sql) return self.cursor.fetchall() diff --git a/push_report_to_redis.py b/push_report_to_redis.py new file mode 100644 index 0000000..ee3b2c8 --- /dev/null +++ b/push_report_to_redis.py @@ -0,0 +1,19 @@ +import json +from DB import DBVidcon +payload_list = [] +db = DBVidcon() +rows = db.get_report_video() +push = db.push_report + +# ======================= + +for row in rows: + payload_list.append(json.dumps({**row}, ensure_ascii=False)) + if len(payload_list) >= 10000: + push(payload_list) + payload_list.clear() +if payload_list: # 收尾 + push(payload_list) + +db.close() +