feat: 添加视频重复检测功能,支持Redis存储视频键

This commit is contained in:
晓丰 2025-06-02 22:23:44 +08:00
parent 59c54d667c
commit 12915c13a5
3 changed files with 81 additions and 47 deletions

96
DB.py
View File

@ -152,6 +152,7 @@ class DBVidcon:
"password": "qwert@$123!&",
"decode_responses": True,
}
VIDEO_HASH_KEY = "video_keys"
def __init__(self):
self.l0_list_key = "video_l0_queue"
@ -500,6 +501,39 @@ class DBVidcon:
# 如果你存入的是 JSON 字符串,可以在这里做一次反序列化:
return json.loads(item) if item is not None else None
# =========================
@redis_retry(max_retries=3)
def cache_video_key(self, v_xid: str, rn: str):
"""把 (v_xid,rn) 写进 Redis 哈希,幂等。"""
if v_xid and rn:
self.redis.hset(self.VIDEO_HASH_KEY, f"{v_xid}:{rn}", 1)
@redis_retry(max_retries=3)
def cache_video_keys_bulk(self, pairs: List[tuple]):
"""pairs = [(vxid, rn), …] 批量写入"""
if not pairs:
return
mapping = {f"{vx}:{rn}": 1 for vx, rn in pairs if vx and rn}
if mapping:
self.redis.hset(self.VIDEO_HASH_KEY, mapping=mapping)
@redis_retry(max_retries=3)
def video_key_exists(self, v_xid: str, rn: str) -> bool:
"""Redis 判断 (vxid,rn) 是否已存在"""
return bool(self.redis.hexists(self.VIDEO_HASH_KEY, f"{v_xid}:{rn}"))
@redis_retry(max_retries=3)
def video_keys_exist_bulk(self, pairs: List[tuple]) -> set:
"""
返回已存在的键集合 {"vxid:rn", }
"""
pipe = self.redis.pipeline()
for vx, rn in pairs:
pipe.hexists(self.VIDEO_HASH_KEY, f"{vx}:{rn}")
flags = pipe.execute()
return {f"{vx}:{rn}" for (vx, rn), flag in zip(pairs, flags) if flag}
class DBSA:
FLUSH_EVERY_ROWS = 100 # 行阈值
@ -602,36 +636,6 @@ class DBSA:
if not op_rows and not vid_rows:
return
existing_keys = set()
if vid_rows:
# 收集 (v_xid, rn) 对,应与 video 表中的唯一索引匹配
all_keys = list({(row["v_xid"], row["rn"]) for row in vid_rows})
conn = _engine.connect()
try:
sel_vid = (
video.select()
.with_only_columns(video.c.v_xid, video.c.rn)
.where(tuple_(video.c.v_xid, video.c.rn).in_(all_keys))
)
existing_keys = {(row.v_xid, row.rn) for row in conn.execute(sel_vid).fetchall()}
except Exception as e:
logger.info(f"[DBSA] 查询 video 表时异常: {e}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.info("[Redis 回退失败]", re)
return
finally:
conn.close()
for i, vid_row in enumerate(vid_rows):
key = (vid_row["v_xid"], vid_row["rn"])
if key in existing_keys:
op_rows[i]["is_repeat"] = 1
else:
op_rows[i]["is_repeat"] = 2
# 以下作者表、日志表和视频表写入逻辑保持不变...
authors_map = {}
now_ts = int(time.time())
for data in payloads:
@ -645,8 +649,6 @@ class DBSA:
"u_pic": data.get("u_pic"),
"follow_number": data.get("fans", 0),
"v_number": data.get("videos", 0),
"pv_number": 0,
"b_number": 0,
"create_time": datetime.utcnow(),
"update_time": now_ts
}
@ -659,8 +661,6 @@ class DBSA:
"u_pic": stmt_author.inserted.u_pic,
"follow_number": stmt_author.inserted.follow_number,
"v_number": stmt_author.inserted.v_number,
"pv_number": stmt_author.inserted.pv_number,
"b_number": stmt_author.inserted.b_number,
"update_time": stmt_author.inserted.update_time,
}
ondup_author = stmt_author.on_duplicate_key_update(**upd_author)
@ -686,13 +686,21 @@ class DBSA:
logger.info("[Redis 回退失败]", re)
return
for vid_row in vid_rows:
# 保留 is_repeat 信息,后面用于判断新插入
new_pairs = []
for i, vid_row in enumerate(vid_rows):
if op_rows[i].get("is_repeat") == 2:
new_pairs.append((vid_row["v_xid"], vid_row["rn"]))
vid_row.pop("is_repeat", None)
vid_row.pop("level", None)
if vid_rows:
try:
cls._bulk_upsert(vid_rows)
if new_pairs:
db = DBVidcon()
db.cache_video_keys_bulk(new_pairs)
logger.info(f"[DBSA] 同步 {len(new_pairs)} 条新视频到 Redis video_keys")
logger.info(
f"[DBSA] flush 完成authors={len(author_rows)}op={len(op_rows)}video={len(vid_rows)}"
)
@ -737,3 +745,21 @@ class DBSA:
daemon=True
)
thread.start()
@classmethod
def stream_video_keys(cls, chunk_size: int = 10_000):
"""yield [(vxid, rn), …] 批次,纯 SQLAlchemy无 Redis 依赖"""
with _engine.connect() as conn:
result = (
conn.execution_options(stream_results=True)
.execute(video.select()
.with_only_columns(video.c.v_xid, video.c.rn))
)
batch = []
for vxid, rn in result:
batch.append((vxid, rn))
if len(batch) >= chunk_size:
yield batch
batch = []
if batch:
yield batch

View File

@ -1,6 +1,6 @@
import json, time
import argparse
from DB import DBVidcon
from DB import DBVidcon, DBSA
def parse_args():
parser = argparse.ArgumentParser(
@ -14,8 +14,10 @@ def main():
args = parse_args()
batch = int(time.time())
db = DBVidcon()
push = None
empty = None
for chunk in DBSA.stream_video_keys(chunk_size=10_000):
db.cache_video_keys_bulk(chunk)
print(f"同步Redis=={len(chunk)}")
if args.level == 0:
push = db.push_l0

22
main.py
View File

@ -55,7 +55,7 @@ def format_duration(seconds):
return "00:00"
def get_searchInfo(keyword, level, headers, proxy_name, r=2):
def get_searchInfo(keyword, level, rn, proxy_name, r=2):
if r == 2:
logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}")
video_list = []
@ -87,7 +87,7 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
logger.exception(f"[Requested] 未知:{e}, keyword: {keyword}, l: {level}")
else:
time.sleep((3 - r) * 5)
return get_searchInfo(keyword, level, headers, proxy_name, r - 1)
return get_searchInfo(keyword, level, rn, proxy_name, r - 1)
try:
resinfo = jsondata.get("list")
except Exception:
@ -97,7 +97,7 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
return None
else:
time.sleep((3 - r) * 5)
return get_searchInfo(keyword, level, headers, proxy_name, r - 1)
return get_searchInfo(keyword, level, rn, proxy_name, r - 1)
for index, iteminfo in enumerate(resinfo):
calculated_index = index + 1 + (j - 1) * limit
xid = iteminfo["id"]
@ -105,6 +105,10 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
uxid = iteminfo["owner.id"]
uid = base64.b64encode(f"Channel:{uxid}".encode('utf-8')).decode('utf-8')
duration = iteminfo.get('duration')
is_repeat = 0
if db.video_key_exists(vid.strip(), rn):
is_repeat = 1
if duration <= 300:
continue
v_data = {
@ -122,10 +126,13 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
"u_id": uid,
"u_xid": uxid,
"u_name": iteminfo.get('owner.screenname'),
"u_pic": iteminfo.get('owner.avatar_60_url')
"u_pic": iteminfo.get('owner.avatar_60_url'),
"is_repeat": is_repeat,
}
video_list.append(v_data)
time.sleep(3)
if len(video_list) < 100:
break
return video_list
@ -135,11 +142,11 @@ proxiesdict = db.get_proxy_agent_dict()
def search_worker(payload, kitem, flag):
try:
gproxies = proxiesdict[kitem['rn']]
v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies)
v_list = get_searchInfo(kitem['keyword'], kitem['level'], kitem['rn'], gproxies)
if not v_list:
for i in range(2):
time.sleep(i * 5)
v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies)
v_list = get_searchInfo(kitem['keyword'], kitem['level'], kitem['rn'], gproxies)
if v_list:
break
time.sleep(2)
@ -175,8 +182,6 @@ def integrate_data_parallel():
continue
for item in v_list:
if not v_list:
continue
DBSA.upsert_video({
"keyword": kitem["keyword"],
"v_name": kitem["v_name"],
@ -199,6 +204,7 @@ def integrate_data_parallel():
"batch": kitem["batch"],
"machine_id": MACHINE_ID,
"level": kitem["level"],
"is_repeat": item['is_repeat']
})
DBSA.flush()
if rollback[0]: