feat: 添加 Redis 重试机制以增强连接稳定性

This commit is contained in:
晓丰 2025-05-18 18:27:00 +08:00
parent fad30a1c21
commit 1737f87299

91
DB.py
View File

@ -2,6 +2,32 @@ import json
import redis import redis
import pymysql import pymysql
import time import time
import functools
from redis.exceptions import ConnectionError, TimeoutError
def redis_retry(max_retries: int = 3):
"""
装饰器工厂指定最大重试次数
"""
def decorator(fn):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
for attempt in range(1, max_retries + 1):
try:
return fn(self, *args, **kwargs)
except (ConnectionError, TimeoutError) as e:
print(f"[Redis][{fn.__name__}] 第 {attempt} 次失败:{e}")
self.reconnect_redis()
if attempt == max_retries:
print("[Redis] 连接彻底失败")
raise
print(f"[Redis] 重连后第 {attempt + 1} 次重试…")
return wrapper
return decorator
class DBVidcon: class DBVidcon:
@ -41,42 +67,42 @@ class DBVidcon:
print("[Redis reconnect error]", e) print("[Redis reconnect error]", e)
time.sleep(2) time.sleep(2)
@redis_retry(max_retries=3)
def push_record(self, data: dict): def push_record(self, data: dict):
raw = json.dumps(data, ensure_ascii=False) raw = json.dumps(data, ensure_ascii=False)
try:
self.redis.lpush(self.error_list_key, raw)
except redis.exceptions.ConnectionError:
self.reconnect_redis()
self.redis.lpush(self.error_list_key, raw) self.redis.lpush(self.error_list_key, raw)
def fetch_records(self, count: int = 100):
try:
raws = self.redis.lpop(self.record_list_key, count)
except TypeError:
raws = []
for _ in range(count):
item = self.redis.rpop(self.record_list_key)
if item is None:
break
raws.append(item)
except redis.exceptions.ConnectionError:
self.reconnect_redis()
return self.fetch_records(count)
if not raws: # def fetch_records(self, count: int = 100):
return [] # try:
if isinstance(raws, str): # raws = self.redis.lpop(self.record_list_key, count)
raws = [raws] # except TypeError:
# raws = []
# for _ in range(count):
# item = self.redis.rpop(self.record_list_key)
# if item is None:
# break
# raws.append(item)
# except redis.exceptions.ConnectionError:
# self.reconnect_redis()
# return self.fetch_records(count)
#
# if not raws:
# return []
# if isinstance(raws, str):
# raws = [raws]
#
# out = []
# for raw in raws:
# try:
# data = json.loads(raw)
# out.append((raw, data))
# except json.JSONDecodeError:
# continue
# return out
out = []
for raw in raws:
try:
data = json.loads(raw)
out.append((raw, data))
except json.JSONDecodeError:
continue
return out
@redis_retry(max_retries=3)
def fetch_from_redis(self, count: int = 100, list_key: str = None): def fetch_from_redis(self, count: int = 100, list_key: str = None):
key = list_key key = list_key
try: try:
@ -118,11 +144,13 @@ class DBVidcon:
print(result) print(result)
return result['parameter'] if result else None return result['parameter'] if result else None
@redis_retry(max_retries=3)
def rollback_records(self, raws): def rollback_records(self, raws):
if isinstance(raws, str): if isinstance(raws, str):
raws = [raws] raws = [raws]
self.redis.lpush(self.urgent_list_key, *raws) self.redis.lpush(self.urgent_list_key, *raws)
@redis_retry(max_retries=3)
def rollback_urgent(self, raws): def rollback_urgent(self, raws):
if isinstance(raws, str): if isinstance(raws, str):
raws = [raws] raws = [raws]
@ -133,6 +161,7 @@ class DBVidcon:
self.reconnect_redis() self.reconnect_redis()
self.redis.lpush(self.urgent_list_key, *raws) self.redis.lpush(self.urgent_list_key, *raws)
@redis_retry(max_retries=3)
def item_keyword(self, count: int = 100): def item_keyword(self, count: int = 100):
try: try:
urgent_items = self.fetch_from_redis(count, list_key=self.urgent_list_key) urgent_items = self.fetch_from_redis(count, list_key=self.urgent_list_key)
@ -150,6 +179,7 @@ class DBVidcon:
return [], 0 return [], 0
return items, 2 return items, 2
@redis_retry(max_retries=3)
def rollback(self, payloads): def rollback(self, payloads):
if not payloads: if not payloads:
return return
@ -254,6 +284,7 @@ class DBVidcon:
self.cursor.close() self.cursor.close()
self.conn.close() self.conn.close()
@redis_retry(max_retries=3)
def get_proxy(self, region_code: str) -> str: def get_proxy(self, region_code: str) -> str:
""" """
Redis 队列 proxy_queue:<region_code> 弹出一个代理并返回 Redis 队列 proxy_queue:<region_code> 弹出一个代理并返回
@ -269,6 +300,7 @@ class DBVidcon:
break break
return proxy return proxy
@redis_retry(max_retries=3)
def queues_empty(self) -> bool: def queues_empty(self) -> bool:
""" """
判断 urgent_list_key list_key 两个队列是否都为空 判断 urgent_list_key list_key 两个队列是否都为空
@ -280,6 +312,7 @@ class DBVidcon:
and self.redis.llen(self.list_key) == 0 and self.redis.llen(self.list_key) == 0
) )
@redis_retry(max_retries=3)
def pop_error_item(self): def pop_error_item(self):
""" """
error_list_key 中弹出一个错误记录lpop error_list_key 中弹出一个错误记录lpop