feat: 添加视频队列键名和回滚逻辑,优化视频处理流程

This commit is contained in:
晓丰 2025-05-20 21:29:11 +08:00
parent b5f611f21c
commit b54f30c200

95
DB.py
View File

@ -76,9 +76,10 @@ class DBVidcon:
}
def __init__(self):
self.l1_list_key = "video_l1_queue"
self.error_list_key = "error_save_queue"
self.l0_list_key = "video_l0_queue"
self.l1_list_key = "video_l1_queue"
self.l2_list_key = "video_l2_queue"
self.error_list_key = "error_save_queue"
self.conn = pymysql.connect(**self._MYSQL_CONF)
self.cursor = self.conn.cursor()
self.redis = redis.Redis(**self._REDIS_CONF)
@ -100,34 +101,6 @@ class DBVidcon:
raw = json.dumps(data, ensure_ascii=False)
self.redis.lpush(self.error_list_key, raw)
# def fetch_records(self, count: int = 100):
# try:
# raws = self.redis.lpop(self.record_list_key, count)
# except TypeError:
# raws = []
# for _ in range(count):
# item = self.redis.rpop(self.record_list_key)
# if item is None:
# break
# raws.append(item)
# except redis.exceptions.ConnectionError:
# self.reconnect_redis()
# return self.fetch_records(count)
#
# if not raws:
# return []
# if isinstance(raws, str):
# raws = [raws]
#
# out = []
# for raw in raws:
# try:
# data = json.loads(raw)
# out.append((raw, data))
# except json.JSONDecodeError:
# continue
# return out
@redis_retry(max_retries=3)
def fetch_from_redis(self, count: int = 100, list_key: str = None):
key = list_key
@ -172,49 +145,56 @@ class DBVidcon:
print(result)
return result['parameter'] if result else None
@redis_retry(max_retries=3)
def rollback_records(self, raws):
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.l0_list_key, *raws)
@redis_retry(max_retries=3)
def rollback_urgent(self, raws):
if isinstance(raws, str):
raws = [raws]
try:
self.redis.lpush(self.l0_list_key, *raws)
except redis.exceptions.ConnectionError as e:
print("[Redis urgent rollback error]", e)
self.reconnect_redis()
self.redis.lpush(self.l0_list_key, *raws)
@redis_retry(max_retries=3)
def item_keyword(self, count: int = 20):
try:
urgent_items = self.fetch_from_redis(count, list_key=self.l0_list_key)
items = self.fetch_from_redis(count, list_key=self.l0_list_key)
except Exception as e:
print("[Redis urgent pop error]", e)
print("[Redis l0 pop error]", e)
self.reconnect_redis()
urgent_items = []
if urgent_items:
return urgent_items, 1
items = []
if items:
return items, 0
try:
items = self.fetch_from_redis(count, list_key=self.l1_list_key)
except Exception as e:
print("[Redis normal pop error]", e)
print("[Redis l1 pop error]", e)
self.reconnect_redis()
return [], 0
return items, 2
items = []
if items:
return items, 1
try:
items = self.fetch_from_redis(count, list_key=self.l2_list_key)
except Exception as e:
print("[Redis l2 pop error]", e)
self.reconnect_redis()
items = []
return items, 99
@redis_retry(max_retries=3)
def rollback(self, payloads):
def rollback_l1(self, payloads):
if not payloads:
return
if isinstance(payloads, str):
payloads = [payloads]
self.redis.rpush(self.l1_list_key, *payloads)
print(f"[回滚] 已退回 {len(payloads)}")
print(f"[回滚l1] 已退回 {len(payloads)}")
@redis_retry(max_retries=3)
def rollback_l0(self, raws):
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.l0_list_key, *raws)
print(f"[回滚l0] 已退回 {len(raws)}")
@redis_retry(max_retries=3)
def rollback_l2(self, raws):
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.l2_list_key, *raws)
print(f"[回滚l2] 已退回 {len(raws)}")
@mysql_retry()
def upsert_video(self, data: dict):
@ -320,6 +300,7 @@ class DBVidcon:
return (
self.redis.llen(self.l0_list_key) == 0
and self.redis.llen(self.l1_list_key) == 0
and self.redis.llen(self.l2_list_key) == 0
)
@redis_retry(max_retries=3)