From 12915c13a563c5338f4e263034cbf84a7ccfe9c9 Mon Sep 17 00:00:00 2001 From: Franklin-F Date: Mon, 2 Jun 2025 22:23:44 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E9=87=8D=E5=A4=8D=E6=A3=80=E6=B5=8B=E5=8A=9F=E8=83=BD=EF=BC=8C?= =?UTF-8?q?=E6=94=AF=E6=8C=81Redis=E5=AD=98=E5=82=A8=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E9=94=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DB.py | 96 +++++++++++++++++++++++++++---------------- dump_keyword_title.py | 8 ++-- main.py | 24 +++++++---- 3 files changed, 81 insertions(+), 47 deletions(-) diff --git a/DB.py b/DB.py index 1dc90d8..d5732ba 100644 --- a/DB.py +++ b/DB.py @@ -152,6 +152,7 @@ class DBVidcon: "password": "qwert@$123!&", "decode_responses": True, } + VIDEO_HASH_KEY = "video_keys" def __init__(self): self.l0_list_key = "video_l0_queue" @@ -500,6 +501,39 @@ class DBVidcon: # 如果你存入的是 JSON 字符串,可以在这里做一次反序列化: return json.loads(item) if item is not None else None + # ========================= + + @redis_retry(max_retries=3) + def cache_video_key(self, v_xid: str, rn: str): + """把 (v_xid,rn) 写进 Redis 哈希,幂等。""" + if v_xid and rn: + self.redis.hset(self.VIDEO_HASH_KEY, f"{v_xid}:{rn}", 1) + + @redis_retry(max_retries=3) + def cache_video_keys_bulk(self, pairs: List[tuple]): + """pairs = [(vxid, rn), …] 批量写入""" + if not pairs: + return + mapping = {f"{vx}:{rn}": 1 for vx, rn in pairs if vx and rn} + if mapping: + self.redis.hset(self.VIDEO_HASH_KEY, mapping=mapping) + + @redis_retry(max_retries=3) + def video_key_exists(self, v_xid: str, rn: str) -> bool: + """Redis 判断 (vxid,rn) 是否已存在""" + return bool(self.redis.hexists(self.VIDEO_HASH_KEY, f"{v_xid}:{rn}")) + + @redis_retry(max_retries=3) + def video_keys_exist_bulk(self, pairs: List[tuple]) -> set: + """ + 返回已存在的键集合 {"vxid:rn", …} + """ + pipe = self.redis.pipeline() + for vx, rn in pairs: + pipe.hexists(self.VIDEO_HASH_KEY, f"{vx}:{rn}") + flags = pipe.execute() + return {f"{vx}:{rn}" for (vx, rn), flag in zip(pairs, flags) if flag} + class DBSA: FLUSH_EVERY_ROWS = 100 # 行阈值 @@ -602,36 +636,6 @@ class DBSA: if not op_rows and not vid_rows: return - existing_keys = set() - if vid_rows: - # 收集 (v_xid, rn) 对,应与 video 表中的唯一索引匹配 - all_keys = list({(row["v_xid"], row["rn"]) for row in vid_rows}) - conn = _engine.connect() - try: - sel_vid = ( - video.select() - .with_only_columns(video.c.v_xid, video.c.rn) - .where(tuple_(video.c.v_xid, video.c.rn).in_(all_keys)) - ) - existing_keys = {(row.v_xid, row.rn) for row in conn.execute(sel_vid).fetchall()} - except Exception as e: - logger.info(f"[DBSA] 查询 video 表时异常: {e}") - try: - cls.push_record_many(payloads) - except Exception as re: - logger.info("[Redis 回退失败]", re) - return - finally: - conn.close() - - for i, vid_row in enumerate(vid_rows): - key = (vid_row["v_xid"], vid_row["rn"]) - if key in existing_keys: - op_rows[i]["is_repeat"] = 1 - else: - op_rows[i]["is_repeat"] = 2 - - # 以下作者表、日志表和视频表写入逻辑保持不变... authors_map = {} now_ts = int(time.time()) for data in payloads: @@ -645,8 +649,6 @@ class DBSA: "u_pic": data.get("u_pic"), "follow_number": data.get("fans", 0), "v_number": data.get("videos", 0), - "pv_number": 0, - "b_number": 0, "create_time": datetime.utcnow(), "update_time": now_ts } @@ -659,8 +661,6 @@ class DBSA: "u_pic": stmt_author.inserted.u_pic, "follow_number": stmt_author.inserted.follow_number, "v_number": stmt_author.inserted.v_number, - "pv_number": stmt_author.inserted.pv_number, - "b_number": stmt_author.inserted.b_number, "update_time": stmt_author.inserted.update_time, } ondup_author = stmt_author.on_duplicate_key_update(**upd_author) @@ -686,13 +686,21 @@ class DBSA: logger.info("[Redis 回退失败]", re) return - for vid_row in vid_rows: + # 保留 is_repeat 信息,后面用于判断新插入 + new_pairs = [] + for i, vid_row in enumerate(vid_rows): + if op_rows[i].get("is_repeat") == 2: + new_pairs.append((vid_row["v_xid"], vid_row["rn"])) vid_row.pop("is_repeat", None) vid_row.pop("level", None) if vid_rows: try: cls._bulk_upsert(vid_rows) + if new_pairs: + db = DBVidcon() + db.cache_video_keys_bulk(new_pairs) + logger.info(f"[DBSA] 同步 {len(new_pairs)} 条新视频到 Redis video_keys") logger.info( f"[DBSA] flush 完成:authors={len(author_rows)} 条,op={len(op_rows)} 条,video={len(vid_rows)} 条" ) @@ -737,3 +745,21 @@ class DBSA: daemon=True ) thread.start() + + @classmethod + def stream_video_keys(cls, chunk_size: int = 10_000): + """yield [(vxid, rn), …] 批次,纯 SQLAlchemy,无 Redis 依赖""" + with _engine.connect() as conn: + result = ( + conn.execution_options(stream_results=True) + .execute(video.select() + .with_only_columns(video.c.v_xid, video.c.rn)) + ) + batch = [] + for vxid, rn in result: + batch.append((vxid, rn)) + if len(batch) >= chunk_size: + yield batch + batch = [] + if batch: + yield batch \ No newline at end of file diff --git a/dump_keyword_title.py b/dump_keyword_title.py index dcdbf3f..bb74ac6 100644 --- a/dump_keyword_title.py +++ b/dump_keyword_title.py @@ -1,6 +1,6 @@ import json, time import argparse -from DB import DBVidcon +from DB import DBVidcon, DBSA def parse_args(): parser = argparse.ArgumentParser( @@ -14,8 +14,10 @@ def main(): args = parse_args() batch = int(time.time()) db = DBVidcon() - push = None - empty = None + + for chunk in DBSA.stream_video_keys(chunk_size=10_000): + db.cache_video_keys_bulk(chunk) + print(f"同步Redis=={len(chunk)}") if args.level == 0: push = db.push_l0 diff --git a/main.py b/main.py index 809aa36..f6241c4 100644 --- a/main.py +++ b/main.py @@ -55,7 +55,7 @@ def format_duration(seconds): return "00:00" -def get_searchInfo(keyword, level, headers, proxy_name, r=2): +def get_searchInfo(keyword, level, rn, proxy_name, r=2): if r == 2: logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}") video_list = [] @@ -87,7 +87,7 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2): logger.exception(f"[Requested] 未知:{e}, keyword: {keyword}, l: {level}") else: time.sleep((3 - r) * 5) - return get_searchInfo(keyword, level, headers, proxy_name, r - 1) + return get_searchInfo(keyword, level, rn, proxy_name, r - 1) try: resinfo = jsondata.get("list") except Exception: @@ -97,7 +97,7 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2): return None else: time.sleep((3 - r) * 5) - return get_searchInfo(keyword, level, headers, proxy_name, r - 1) + return get_searchInfo(keyword, level, rn, proxy_name, r - 1) for index, iteminfo in enumerate(resinfo): calculated_index = index + 1 + (j - 1) * limit xid = iteminfo["id"] @@ -105,6 +105,10 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2): uxid = iteminfo["owner.id"] uid = base64.b64encode(f"Channel:{uxid}".encode('utf-8')).decode('utf-8') duration = iteminfo.get('duration') + is_repeat = 0 + if db.video_key_exists(vid.strip(), rn): + is_repeat = 1 + if duration <= 300: continue v_data = { @@ -122,10 +126,13 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2): "u_id": uid, "u_xid": uxid, "u_name": iteminfo.get('owner.screenname'), - "u_pic": iteminfo.get('owner.avatar_60_url') + "u_pic": iteminfo.get('owner.avatar_60_url'), + "is_repeat": is_repeat, } video_list.append(v_data) - time.sleep(3) + time.sleep(3) + if len(video_list) < 100: + break return video_list @@ -135,11 +142,11 @@ proxiesdict = db.get_proxy_agent_dict() def search_worker(payload, kitem, flag): try: gproxies = proxiesdict[kitem['rn']] - v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies) + v_list = get_searchInfo(kitem['keyword'], kitem['level'], kitem['rn'], gproxies) if not v_list: for i in range(2): time.sleep(i * 5) - v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies) + v_list = get_searchInfo(kitem['keyword'], kitem['level'], kitem['rn'], gproxies) if v_list: break time.sleep(2) @@ -175,8 +182,6 @@ def integrate_data_parallel(): continue for item in v_list: - if not v_list: - continue DBSA.upsert_video({ "keyword": kitem["keyword"], "v_name": kitem["v_name"], @@ -199,6 +204,7 @@ def integrate_data_parallel(): "batch": kitem["batch"], "machine_id": MACHINE_ID, "level": kitem["level"], + "is_repeat": item['is_repeat'] }) DBSA.flush() if rollback[0]: