diff --git a/DB.py b/DB.py index 6900098..1375a93 100644 --- a/DB.py +++ b/DB.py @@ -160,6 +160,7 @@ class DBVidcon: self.l0_list_key = "video_l0_queue" self.l1_list_key = "video_l1_queue" self.l2_list_key = "video_l2_queue" + self.report_list = "report_queue" self.error_list_key = "error_save_queue" self.conn = pymysql.connect(**self._MYSQL_CONF) self.cursor = self.conn.cursor() @@ -234,6 +235,14 @@ class DBVidcon: self.redis.lpush(self.l2_list_key, *raws) logger.info(f"[写入l2] 已推入 {len(raws)} 条") + @redis_retry(max_retries=3) + def push_report(self, raws): + """向 report_queue 写入数据""" + if isinstance(raws, str): + raws = [raws] + self.redis.rpush(self.report_list, *raws) + logger.info(f"[写入report] 已推入 {len(raws)} 条") + @redis_retry(max_retries=3) def get_proxy_agent_dict(self) -> dict: try: @@ -260,6 +269,16 @@ class DBVidcon: logger.info(result) return result['parameter'] if result else None + @redis_retry(max_retries=3) + def item_report(self, count: int = 20): + try: + items = self.fetch_from_redis(count, list_key=self.l0_list_key) + except Exception as e: + logger.info("[Redis l0 pop error]", e) + self.reconnect_redis() + items = [] + return items + @redis_retry(max_retries=3) def item_keyword(self, count: int = 20): try: @@ -323,7 +342,6 @@ class DBVidcon: sh_dm_fight_records WHERE status = 1 - LIMIT 1 """ self.cursor.execute(sql) return self.cursor.fetchall() diff --git a/push_report_to_redis.py b/push_report_to_redis.py new file mode 100644 index 0000000..ee3b2c8 --- /dev/null +++ b/push_report_to_redis.py @@ -0,0 +1,19 @@ +import json +from DB import DBVidcon +payload_list = [] +db = DBVidcon() +rows = db.get_report_video() +push = db.push_report + +# ======================= + +for row in rows: + payload_list.append(json.dumps({**row}, ensure_ascii=False)) + if len(payload_list) >= 10000: + push(payload_list) + payload_list.clear() +if payload_list: # 收尾 + push(payload_list) + +db.close() +