diff --git a/DB.py b/DB.py index 3d93a17..2513e8c 100644 --- a/DB.py +++ b/DB.py @@ -76,9 +76,10 @@ class DBVidcon: } def __init__(self): - self.l1_list_key = "video_l1_queue" - self.error_list_key = "error_save_queue" self.l0_list_key = "video_l0_queue" + self.l1_list_key = "video_l1_queue" + self.l2_list_key = "video_l2_queue" + self.error_list_key = "error_save_queue" self.conn = pymysql.connect(**self._MYSQL_CONF) self.cursor = self.conn.cursor() self.redis = redis.Redis(**self._REDIS_CONF) @@ -100,34 +101,6 @@ class DBVidcon: raw = json.dumps(data, ensure_ascii=False) self.redis.lpush(self.error_list_key, raw) - # def fetch_records(self, count: int = 100): - # try: - # raws = self.redis.lpop(self.record_list_key, count) - # except TypeError: - # raws = [] - # for _ in range(count): - # item = self.redis.rpop(self.record_list_key) - # if item is None: - # break - # raws.append(item) - # except redis.exceptions.ConnectionError: - # self.reconnect_redis() - # return self.fetch_records(count) - # - # if not raws: - # return [] - # if isinstance(raws, str): - # raws = [raws] - # - # out = [] - # for raw in raws: - # try: - # data = json.loads(raw) - # out.append((raw, data)) - # except json.JSONDecodeError: - # continue - # return out - @redis_retry(max_retries=3) def fetch_from_redis(self, count: int = 100, list_key: str = None): key = list_key @@ -172,49 +145,56 @@ class DBVidcon: print(result) return result['parameter'] if result else None - @redis_retry(max_retries=3) - def rollback_records(self, raws): - if isinstance(raws, str): - raws = [raws] - self.redis.lpush(self.l0_list_key, *raws) - - @redis_retry(max_retries=3) - def rollback_urgent(self, raws): - if isinstance(raws, str): - raws = [raws] - try: - self.redis.lpush(self.l0_list_key, *raws) - except redis.exceptions.ConnectionError as e: - print("[Redis urgent rollback error]", e) - self.reconnect_redis() - self.redis.lpush(self.l0_list_key, *raws) - @redis_retry(max_retries=3) def item_keyword(self, count: int = 20): try: - urgent_items = self.fetch_from_redis(count, list_key=self.l0_list_key) + items = self.fetch_from_redis(count, list_key=self.l0_list_key) except Exception as e: - print("[Redis urgent pop error]", e) + print("[Redis l0 pop error]", e) self.reconnect_redis() - urgent_items = [] - if urgent_items: - return urgent_items, 1 + items = [] + + if items: + return items, 0 try: items = self.fetch_from_redis(count, list_key=self.l1_list_key) except Exception as e: - print("[Redis normal pop error]", e) + print("[Redis l1 pop error]", e) self.reconnect_redis() - return [], 0 - return items, 2 + items = [] + + if items: + return items, 1 + try: + items = self.fetch_from_redis(count, list_key=self.l2_list_key) + except Exception as e: + print("[Redis l2 pop error]", e) + self.reconnect_redis() + items = [] + return items, 99 @redis_retry(max_retries=3) - def rollback(self, payloads): + def rollback_l1(self, payloads): if not payloads: return if isinstance(payloads, str): payloads = [payloads] self.redis.rpush(self.l1_list_key, *payloads) - print(f"[回滚] 已退回 {len(payloads)} 条") + print(f"[回滚l1] 已退回 {len(payloads)} 条") + + @redis_retry(max_retries=3) + def rollback_l0(self, raws): + if isinstance(raws, str): + raws = [raws] + self.redis.lpush(self.l0_list_key, *raws) + print(f"[回滚l0] 已退回 {len(raws)} 条") + + @redis_retry(max_retries=3) + def rollback_l2(self, raws): + if isinstance(raws, str): + raws = [raws] + self.redis.lpush(self.l2_list_key, *raws) + print(f"[回滚l2] 已退回 {len(raws)} 条") @mysql_retry() def upsert_video(self, data: dict): @@ -320,6 +300,7 @@ class DBVidcon: return ( self.redis.llen(self.l0_list_key) == 0 and self.redis.llen(self.l1_list_key) == 0 + and self.redis.llen(self.l2_list_key) == 0 ) @redis_retry(max_retries=3)