From b54f30c2008e3502d2f26700c541be43fabf8649 Mon Sep 17 00:00:00 2001 From: Franklin-F Date: Tue, 20 May 2025 21:29:11 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E9=98=9F=E5=88=97=E9=94=AE=E5=90=8D=E5=92=8C=E5=9B=9E=E6=BB=9A?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BC=98=E5=8C=96=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E5=A4=84=E7=90=86=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DB.py | 95 ++++++++++++++++++++++++----------------------------------- 1 file changed, 38 insertions(+), 57 deletions(-) diff --git a/DB.py b/DB.py index 3d93a17..2513e8c 100644 --- a/DB.py +++ b/DB.py @@ -76,9 +76,10 @@ class DBVidcon: } def __init__(self): - self.l1_list_key = "video_l1_queue" - self.error_list_key = "error_save_queue" self.l0_list_key = "video_l0_queue" + self.l1_list_key = "video_l1_queue" + self.l2_list_key = "video_l2_queue" + self.error_list_key = "error_save_queue" self.conn = pymysql.connect(**self._MYSQL_CONF) self.cursor = self.conn.cursor() self.redis = redis.Redis(**self._REDIS_CONF) @@ -100,34 +101,6 @@ class DBVidcon: raw = json.dumps(data, ensure_ascii=False) self.redis.lpush(self.error_list_key, raw) - # def fetch_records(self, count: int = 100): - # try: - # raws = self.redis.lpop(self.record_list_key, count) - # except TypeError: - # raws = [] - # for _ in range(count): - # item = self.redis.rpop(self.record_list_key) - # if item is None: - # break - # raws.append(item) - # except redis.exceptions.ConnectionError: - # self.reconnect_redis() - # return self.fetch_records(count) - # - # if not raws: - # return [] - # if isinstance(raws, str): - # raws = [raws] - # - # out = [] - # for raw in raws: - # try: - # data = json.loads(raw) - # out.append((raw, data)) - # except json.JSONDecodeError: - # continue - # return out - @redis_retry(max_retries=3) def fetch_from_redis(self, count: int = 100, list_key: str = None): key = list_key @@ -172,49 +145,56 @@ class DBVidcon: print(result) return result['parameter'] if result else None - @redis_retry(max_retries=3) - def rollback_records(self, raws): - if isinstance(raws, str): - raws = [raws] - self.redis.lpush(self.l0_list_key, *raws) - - @redis_retry(max_retries=3) - def rollback_urgent(self, raws): - if isinstance(raws, str): - raws = [raws] - try: - self.redis.lpush(self.l0_list_key, *raws) - except redis.exceptions.ConnectionError as e: - print("[Redis urgent rollback error]", e) - self.reconnect_redis() - self.redis.lpush(self.l0_list_key, *raws) - @redis_retry(max_retries=3) def item_keyword(self, count: int = 20): try: - urgent_items = self.fetch_from_redis(count, list_key=self.l0_list_key) + items = self.fetch_from_redis(count, list_key=self.l0_list_key) except Exception as e: - print("[Redis urgent pop error]", e) + print("[Redis l0 pop error]", e) self.reconnect_redis() - urgent_items = [] - if urgent_items: - return urgent_items, 1 + items = [] + + if items: + return items, 0 try: items = self.fetch_from_redis(count, list_key=self.l1_list_key) except Exception as e: - print("[Redis normal pop error]", e) + print("[Redis l1 pop error]", e) self.reconnect_redis() - return [], 0 - return items, 2 + items = [] + + if items: + return items, 1 + try: + items = self.fetch_from_redis(count, list_key=self.l2_list_key) + except Exception as e: + print("[Redis l2 pop error]", e) + self.reconnect_redis() + items = [] + return items, 99 @redis_retry(max_retries=3) - def rollback(self, payloads): + def rollback_l1(self, payloads): if not payloads: return if isinstance(payloads, str): payloads = [payloads] self.redis.rpush(self.l1_list_key, *payloads) - print(f"[回滚] 已退回 {len(payloads)} 条") + print(f"[回滚l1] 已退回 {len(payloads)} 条") + + @redis_retry(max_retries=3) + def rollback_l0(self, raws): + if isinstance(raws, str): + raws = [raws] + self.redis.lpush(self.l0_list_key, *raws) + print(f"[回滚l0] 已退回 {len(raws)} 条") + + @redis_retry(max_retries=3) + def rollback_l2(self, raws): + if isinstance(raws, str): + raws = [raws] + self.redis.lpush(self.l2_list_key, *raws) + print(f"[回滚l2] 已退回 {len(raws)} 条") @mysql_retry() def upsert_video(self, data: dict): @@ -320,6 +300,7 @@ class DBVidcon: return ( self.redis.llen(self.l0_list_key) == 0 and self.redis.llen(self.l1_list_key) == 0 + and self.redis.llen(self.l2_list_key) == 0 ) @redis_retry(max_retries=3)