fix: 添加 MySQL 重试机制以处理 InterfaceError 并确保连接稳定
This commit is contained in:
parent
90ad9c28ff
commit
64f8ed96a1
35
DB.py
35
DB.py
@ -6,6 +6,34 @@ import functools
|
|||||||
from redis.exceptions import ConnectionError, TimeoutError
|
from redis.exceptions import ConnectionError, TimeoutError
|
||||||
|
|
||||||
|
|
||||||
|
def mysql_retry(max_retries: int = 3, base_delay: float = 2.0):
|
||||||
|
"""
|
||||||
|
装饰器工厂:捕获 InterfaceError 后断线重连并重试,
|
||||||
|
重试间隔按指数级增长:base_delay * 2**(attempt-1) 秒。
|
||||||
|
"""
|
||||||
|
|
||||||
|
def decorator(fn):
|
||||||
|
@functools.wraps(fn)
|
||||||
|
def wrapper(self, *args, **kwargs):
|
||||||
|
for attempt in range(1, max_retries + 1):
|
||||||
|
try:
|
||||||
|
# 确保连接存活,reconnect=True 会在 ping 失败时重连
|
||||||
|
self.conn.ping(reconnect=True)
|
||||||
|
return fn(self, *args, **kwargs)
|
||||||
|
except pymysql.InterfaceError as e:
|
||||||
|
wait = base_delay * (2 ** (attempt - 1))
|
||||||
|
print(f"[MySQL][{fn.__name__}] 第{attempt}次 InterfaceError:{e},等待 {wait:.1f}s 后重连…")
|
||||||
|
time.sleep(wait)
|
||||||
|
self._reconnect_mysql()
|
||||||
|
if attempt == max_retries:
|
||||||
|
print("[MySQL] 重试多次仍失败,抛出异常")
|
||||||
|
raise
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
def redis_retry(max_retries: int = 3):
|
def redis_retry(max_retries: int = 3):
|
||||||
"""
|
"""
|
||||||
装饰器工厂:指定最大重试次数。
|
装饰器工厂:指定最大重试次数。
|
||||||
@ -72,7 +100,6 @@ class DBVidcon:
|
|||||||
raw = json.dumps(data, ensure_ascii=False)
|
raw = json.dumps(data, ensure_ascii=False)
|
||||||
self.redis.lpush(self.error_list_key, raw)
|
self.redis.lpush(self.error_list_key, raw)
|
||||||
|
|
||||||
|
|
||||||
# def fetch_records(self, count: int = 100):
|
# def fetch_records(self, count: int = 100):
|
||||||
# try:
|
# try:
|
||||||
# raws = self.redis.lpop(self.record_list_key, count)
|
# raws = self.redis.lpop(self.record_list_key, count)
|
||||||
@ -101,7 +128,6 @@ class DBVidcon:
|
|||||||
# continue
|
# continue
|
||||||
# return out
|
# return out
|
||||||
|
|
||||||
|
|
||||||
@redis_retry(max_retries=3)
|
@redis_retry(max_retries=3)
|
||||||
def fetch_from_redis(self, count: int = 100, list_key: str = None):
|
def fetch_from_redis(self, count: int = 100, list_key: str = None):
|
||||||
key = list_key
|
key = list_key
|
||||||
@ -130,6 +156,7 @@ class DBVidcon:
|
|||||||
continue
|
continue
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
@mysql_retry()
|
||||||
def get_proxy_agent_dict(self) -> dict:
|
def get_proxy_agent_dict(self) -> dict:
|
||||||
sql = "SELECT rn, parameter FROM proxy_agent"
|
sql = "SELECT rn, parameter FROM proxy_agent"
|
||||||
self.cursor.execute(sql)
|
self.cursor.execute(sql)
|
||||||
@ -137,6 +164,7 @@ class DBVidcon:
|
|||||||
result = {row['rn']: row['parameter'] for row in rows}
|
result = {row['rn']: row['parameter'] for row in rows}
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
@mysql_retry()
|
||||||
def get_proxy_parameter(self, rn: str) -> str:
|
def get_proxy_parameter(self, rn: str) -> str:
|
||||||
sql = "SELECT parameter FROM proxy_agent WHERE rn = %s LIMIT 1"
|
sql = "SELECT parameter FROM proxy_agent WHERE rn = %s LIMIT 1"
|
||||||
self.cursor.execute(sql, (rn,))
|
self.cursor.execute(sql, (rn,))
|
||||||
@ -188,6 +216,7 @@ class DBVidcon:
|
|||||||
self.redis.rpush(self.list_key, *payloads)
|
self.redis.rpush(self.list_key, *payloads)
|
||||||
print(f"[回滚] 已退回 {len(payloads)} 条")
|
print(f"[回滚] 已退回 {len(payloads)} 条")
|
||||||
|
|
||||||
|
@mysql_retry()
|
||||||
def upsert_video(self, data: dict):
|
def upsert_video(self, data: dict):
|
||||||
data.setdefault("a_id", 0)
|
data.setdefault("a_id", 0)
|
||||||
data.setdefault("history_status", "")
|
data.setdefault("history_status", "")
|
||||||
@ -276,10 +305,12 @@ class DBVidcon:
|
|||||||
print("[交由Redis处理]")
|
print("[交由Redis处理]")
|
||||||
break
|
break
|
||||||
|
|
||||||
|
@mysql_retry()
|
||||||
def flush(self):
|
def flush(self):
|
||||||
"""批量执行完后手动提交。"""
|
"""批量执行完后手动提交。"""
|
||||||
self.conn.commit()
|
self.conn.commit()
|
||||||
|
|
||||||
|
@mysql_retry()
|
||||||
def close(self):
|
def close(self):
|
||||||
self.cursor.close()
|
self.cursor.close()
|
||||||
self.conn.close()
|
self.conn.close()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user