diff --git a/DB.py b/DB.py index d7ca25f..81d711c 100644 --- a/DB.py +++ b/DB.py @@ -2,6 +2,32 @@ import json import redis import pymysql import time +import functools +from redis.exceptions import ConnectionError, TimeoutError + + +def redis_retry(max_retries: int = 3): + """ + 装饰器工厂:指定最大重试次数。 + """ + + def decorator(fn): + @functools.wraps(fn) + def wrapper(self, *args, **kwargs): + for attempt in range(1, max_retries + 1): + try: + return fn(self, *args, **kwargs) + except (ConnectionError, TimeoutError) as e: + print(f"[Redis][{fn.__name__}] 第 {attempt} 次失败:{e}") + self.reconnect_redis() + if attempt == max_retries: + print("[Redis] 连接彻底失败") + raise + print(f"[Redis] 重连后第 {attempt + 1} 次重试…") + + return wrapper + + return decorator class DBVidcon: @@ -41,42 +67,42 @@ class DBVidcon: print("[Redis reconnect error]", e) time.sleep(2) + @redis_retry(max_retries=3) def push_record(self, data: dict): raw = json.dumps(data, ensure_ascii=False) - try: - self.redis.lpush(self.error_list_key, raw) - except redis.exceptions.ConnectionError: - self.reconnect_redis() - self.redis.lpush(self.error_list_key, raw) + self.redis.lpush(self.error_list_key, raw) - def fetch_records(self, count: int = 100): - try: - raws = self.redis.lpop(self.record_list_key, count) - except TypeError: - raws = [] - for _ in range(count): - item = self.redis.rpop(self.record_list_key) - if item is None: - break - raws.append(item) - except redis.exceptions.ConnectionError: - self.reconnect_redis() - return self.fetch_records(count) - if not raws: - return [] - if isinstance(raws, str): - raws = [raws] + # def fetch_records(self, count: int = 100): + # try: + # raws = self.redis.lpop(self.record_list_key, count) + # except TypeError: + # raws = [] + # for _ in range(count): + # item = self.redis.rpop(self.record_list_key) + # if item is None: + # break + # raws.append(item) + # except redis.exceptions.ConnectionError: + # self.reconnect_redis() + # return self.fetch_records(count) + # + # if not raws: + # return [] + # if isinstance(raws, str): + # raws = [raws] + # + # out = [] + # for raw in raws: + # try: + # data = json.loads(raw) + # out.append((raw, data)) + # except json.JSONDecodeError: + # continue + # return out - out = [] - for raw in raws: - try: - data = json.loads(raw) - out.append((raw, data)) - except json.JSONDecodeError: - continue - return out + @redis_retry(max_retries=3) def fetch_from_redis(self, count: int = 100, list_key: str = None): key = list_key try: @@ -118,11 +144,13 @@ class DBVidcon: print(result) return result['parameter'] if result else None + @redis_retry(max_retries=3) def rollback_records(self, raws): if isinstance(raws, str): raws = [raws] self.redis.lpush(self.urgent_list_key, *raws) + @redis_retry(max_retries=3) def rollback_urgent(self, raws): if isinstance(raws, str): raws = [raws] @@ -133,6 +161,7 @@ class DBVidcon: self.reconnect_redis() self.redis.lpush(self.urgent_list_key, *raws) + @redis_retry(max_retries=3) def item_keyword(self, count: int = 100): try: urgent_items = self.fetch_from_redis(count, list_key=self.urgent_list_key) @@ -150,6 +179,7 @@ class DBVidcon: return [], 0 return items, 2 + @redis_retry(max_retries=3) def rollback(self, payloads): if not payloads: return @@ -254,6 +284,7 @@ class DBVidcon: self.cursor.close() self.conn.close() + @redis_retry(max_retries=3) def get_proxy(self, region_code: str) -> str: """ 从 Redis 队列 proxy_queue: 弹出一个代理并返回。 @@ -269,6 +300,7 @@ class DBVidcon: break return proxy + @redis_retry(max_retries=3) def queues_empty(self) -> bool: """ 判断 urgent_list_key 和 list_key 两个队列是否都为空。 @@ -280,6 +312,7 @@ class DBVidcon: and self.redis.llen(self.list_key) == 0 ) + @redis_retry(max_retries=3) def pop_error_item(self): """ 从 error_list_key 中弹出一个错误记录(lpop),