feat: 添加 l0、l1 和 l2 队列的推送方法,优化数据处理流程

This commit is contained in:
晓丰 2025-05-20 21:43:00 +08:00
parent 0903a00ed2
commit 49c2e1d43c

26
DB.py
View File

@ -129,6 +129,30 @@ class DBVidcon:
continue continue
return out return out
@redis_retry(max_retries=3)
def push_l0(self, raws):
"""向 l0加急队列写入数据"""
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.l0_list_key, *raws)
print(f"[写入l0] 已推入 {len(raws)}")
@redis_retry(max_retries=3)
def push_l1(self, payloads):
"""向 l1普通队列写入数据"""
if isinstance(payloads, str):
payloads = [payloads]
self.redis.rpush(self.l1_list_key, *payloads)
print(f"[写入l1] 已推入 {len(payloads)}")
@redis_retry(max_retries=3)
def push_l2(self, raws):
"""向 l2低优先队列写入数据"""
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.l2_list_key, *raws)
print(f"[写入l2] 已推入 {len(raws)}")
@mysql_retry() @mysql_retry()
def get_proxy_agent_dict(self) -> dict: def get_proxy_agent_dict(self) -> dict:
sql = "SELECT rn, parameter FROM proxy_agent" sql = "SELECT rn, parameter FROM proxy_agent"
@ -175,8 +199,6 @@ class DBVidcon:
@redis_retry(max_retries=3) @redis_retry(max_retries=3)
def rollback_l1(self, payloads): def rollback_l1(self, payloads):
if not payloads:
return
if isinstance(payloads, str): if isinstance(payloads, str):
payloads = [payloads] payloads = [payloads]
self.redis.rpush(self.l1_list_key, *payloads) self.redis.rpush(self.l1_list_key, *payloads)