Compare commits

...

1 Commits

3 changed files with 81 additions and 47 deletions

96
DB.py
View File

@ -152,6 +152,7 @@ class DBVidcon:
"password": "qwert@$123!&", "password": "qwert@$123!&",
"decode_responses": True, "decode_responses": True,
} }
VIDEO_HASH_KEY = "video_keys"
def __init__(self): def __init__(self):
self.l0_list_key = "video_l0_queue" self.l0_list_key = "video_l0_queue"
@ -500,6 +501,39 @@ class DBVidcon:
# 如果你存入的是 JSON 字符串,可以在这里做一次反序列化: # 如果你存入的是 JSON 字符串,可以在这里做一次反序列化:
return json.loads(item) if item is not None else None return json.loads(item) if item is not None else None
# =========================
@redis_retry(max_retries=3)
def cache_video_key(self, v_xid: str, rn: str):
"""把 (v_xid,rn) 写进 Redis 哈希,幂等。"""
if v_xid and rn:
self.redis.hset(self.VIDEO_HASH_KEY, f"{v_xid}:{rn}", 1)
@redis_retry(max_retries=3)
def cache_video_keys_bulk(self, pairs: List[tuple]):
"""pairs = [(vxid, rn), …] 批量写入"""
if not pairs:
return
mapping = {f"{vx}:{rn}": 1 for vx, rn in pairs if vx and rn}
if mapping:
self.redis.hset(self.VIDEO_HASH_KEY, mapping=mapping)
@redis_retry(max_retries=3)
def video_key_exists(self, v_xid: str, rn: str) -> bool:
"""Redis 判断 (vxid,rn) 是否已存在"""
return bool(self.redis.hexists(self.VIDEO_HASH_KEY, f"{v_xid}:{rn}"))
@redis_retry(max_retries=3)
def video_keys_exist_bulk(self, pairs: List[tuple]) -> set:
"""
返回已存在的键集合 {"vxid:rn", }
"""
pipe = self.redis.pipeline()
for vx, rn in pairs:
pipe.hexists(self.VIDEO_HASH_KEY, f"{vx}:{rn}")
flags = pipe.execute()
return {f"{vx}:{rn}" for (vx, rn), flag in zip(pairs, flags) if flag}
class DBSA: class DBSA:
FLUSH_EVERY_ROWS = 100 # 行阈值 FLUSH_EVERY_ROWS = 100 # 行阈值
@ -602,36 +636,6 @@ class DBSA:
if not op_rows and not vid_rows: if not op_rows and not vid_rows:
return return
existing_keys = set()
if vid_rows:
# 收集 (v_xid, rn) 对,应与 video 表中的唯一索引匹配
all_keys = list({(row["v_xid"], row["rn"]) for row in vid_rows})
conn = _engine.connect()
try:
sel_vid = (
video.select()
.with_only_columns(video.c.v_xid, video.c.rn)
.where(tuple_(video.c.v_xid, video.c.rn).in_(all_keys))
)
existing_keys = {(row.v_xid, row.rn) for row in conn.execute(sel_vid).fetchall()}
except Exception as e:
logger.info(f"[DBSA] 查询 video 表时异常: {e}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.info("[Redis 回退失败]", re)
return
finally:
conn.close()
for i, vid_row in enumerate(vid_rows):
key = (vid_row["v_xid"], vid_row["rn"])
if key in existing_keys:
op_rows[i]["is_repeat"] = 1
else:
op_rows[i]["is_repeat"] = 2
# 以下作者表、日志表和视频表写入逻辑保持不变...
authors_map = {} authors_map = {}
now_ts = int(time.time()) now_ts = int(time.time())
for data in payloads: for data in payloads:
@ -645,8 +649,6 @@ class DBSA:
"u_pic": data.get("u_pic"), "u_pic": data.get("u_pic"),
"follow_number": data.get("fans", 0), "follow_number": data.get("fans", 0),
"v_number": data.get("videos", 0), "v_number": data.get("videos", 0),
"pv_number": 0,
"b_number": 0,
"create_time": datetime.utcnow(), "create_time": datetime.utcnow(),
"update_time": now_ts "update_time": now_ts
} }
@ -659,8 +661,6 @@ class DBSA:
"u_pic": stmt_author.inserted.u_pic, "u_pic": stmt_author.inserted.u_pic,
"follow_number": stmt_author.inserted.follow_number, "follow_number": stmt_author.inserted.follow_number,
"v_number": stmt_author.inserted.v_number, "v_number": stmt_author.inserted.v_number,
"pv_number": stmt_author.inserted.pv_number,
"b_number": stmt_author.inserted.b_number,
"update_time": stmt_author.inserted.update_time, "update_time": stmt_author.inserted.update_time,
} }
ondup_author = stmt_author.on_duplicate_key_update(**upd_author) ondup_author = stmt_author.on_duplicate_key_update(**upd_author)
@ -686,13 +686,21 @@ class DBSA:
logger.info("[Redis 回退失败]", re) logger.info("[Redis 回退失败]", re)
return return
for vid_row in vid_rows: # 保留 is_repeat 信息,后面用于判断新插入
new_pairs = []
for i, vid_row in enumerate(vid_rows):
if op_rows[i].get("is_repeat") == 2:
new_pairs.append((vid_row["v_xid"], vid_row["rn"]))
vid_row.pop("is_repeat", None) vid_row.pop("is_repeat", None)
vid_row.pop("level", None) vid_row.pop("level", None)
if vid_rows: if vid_rows:
try: try:
cls._bulk_upsert(vid_rows) cls._bulk_upsert(vid_rows)
if new_pairs:
db = DBVidcon()
db.cache_video_keys_bulk(new_pairs)
logger.info(f"[DBSA] 同步 {len(new_pairs)} 条新视频到 Redis video_keys")
logger.info( logger.info(
f"[DBSA] flush 完成authors={len(author_rows)}op={len(op_rows)}video={len(vid_rows)}" f"[DBSA] flush 完成authors={len(author_rows)}op={len(op_rows)}video={len(vid_rows)}"
) )
@ -737,3 +745,21 @@ class DBSA:
daemon=True daemon=True
) )
thread.start() thread.start()
@classmethod
def stream_video_keys(cls, chunk_size: int = 10_000):
"""yield [(vxid, rn), …] 批次,纯 SQLAlchemy无 Redis 依赖"""
with _engine.connect() as conn:
result = (
conn.execution_options(stream_results=True)
.execute(video.select()
.with_only_columns(video.c.v_xid, video.c.rn))
)
batch = []
for vxid, rn in result:
batch.append((vxid, rn))
if len(batch) >= chunk_size:
yield batch
batch = []
if batch:
yield batch

View File

@ -1,6 +1,6 @@
import json, time import json, time
import argparse import argparse
from DB import DBVidcon from DB import DBVidcon, DBSA
def parse_args(): def parse_args():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
@ -14,8 +14,10 @@ def main():
args = parse_args() args = parse_args()
batch = int(time.time()) batch = int(time.time())
db = DBVidcon() db = DBVidcon()
push = None
empty = None for chunk in DBSA.stream_video_keys(chunk_size=10_000):
db.cache_video_keys_bulk(chunk)
print(f"同步Redis=={len(chunk)}")
if args.level == 0: if args.level == 0:
push = db.push_l0 push = db.push_l0

24
main.py
View File

@ -55,7 +55,7 @@ def format_duration(seconds):
return "00:00" return "00:00"
def get_searchInfo(keyword, level, headers, proxy_name, r=2): def get_searchInfo(keyword, level, rn, proxy_name, r=2):
if r == 2: if r == 2:
logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}") logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}")
video_list = [] video_list = []
@ -87,7 +87,7 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
logger.exception(f"[Requested] 未知:{e}, keyword: {keyword}, l: {level}") logger.exception(f"[Requested] 未知:{e}, keyword: {keyword}, l: {level}")
else: else:
time.sleep((3 - r) * 5) time.sleep((3 - r) * 5)
return get_searchInfo(keyword, level, headers, proxy_name, r - 1) return get_searchInfo(keyword, level, rn, proxy_name, r - 1)
try: try:
resinfo = jsondata.get("list") resinfo = jsondata.get("list")
except Exception: except Exception:
@ -97,7 +97,7 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
return None return None
else: else:
time.sleep((3 - r) * 5) time.sleep((3 - r) * 5)
return get_searchInfo(keyword, level, headers, proxy_name, r - 1) return get_searchInfo(keyword, level, rn, proxy_name, r - 1)
for index, iteminfo in enumerate(resinfo): for index, iteminfo in enumerate(resinfo):
calculated_index = index + 1 + (j - 1) * limit calculated_index = index + 1 + (j - 1) * limit
xid = iteminfo["id"] xid = iteminfo["id"]
@ -105,6 +105,10 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
uxid = iteminfo["owner.id"] uxid = iteminfo["owner.id"]
uid = base64.b64encode(f"Channel:{uxid}".encode('utf-8')).decode('utf-8') uid = base64.b64encode(f"Channel:{uxid}".encode('utf-8')).decode('utf-8')
duration = iteminfo.get('duration') duration = iteminfo.get('duration')
is_repeat = 0
if db.video_key_exists(vid.strip(), rn):
is_repeat = 1
if duration <= 300: if duration <= 300:
continue continue
v_data = { v_data = {
@ -122,10 +126,13 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
"u_id": uid, "u_id": uid,
"u_xid": uxid, "u_xid": uxid,
"u_name": iteminfo.get('owner.screenname'), "u_name": iteminfo.get('owner.screenname'),
"u_pic": iteminfo.get('owner.avatar_60_url') "u_pic": iteminfo.get('owner.avatar_60_url'),
"is_repeat": is_repeat,
} }
video_list.append(v_data) video_list.append(v_data)
time.sleep(3) time.sleep(3)
if len(video_list) < 100:
break
return video_list return video_list
@ -135,11 +142,11 @@ proxiesdict = db.get_proxy_agent_dict()
def search_worker(payload, kitem, flag): def search_worker(payload, kitem, flag):
try: try:
gproxies = proxiesdict[kitem['rn']] gproxies = proxiesdict[kitem['rn']]
v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies) v_list = get_searchInfo(kitem['keyword'], kitem['level'], kitem['rn'], gproxies)
if not v_list: if not v_list:
for i in range(2): for i in range(2):
time.sleep(i * 5) time.sleep(i * 5)
v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies) v_list = get_searchInfo(kitem['keyword'], kitem['level'], kitem['rn'], gproxies)
if v_list: if v_list:
break break
time.sleep(2) time.sleep(2)
@ -175,8 +182,6 @@ def integrate_data_parallel():
continue continue
for item in v_list: for item in v_list:
if not v_list:
continue
DBSA.upsert_video({ DBSA.upsert_video({
"keyword": kitem["keyword"], "keyword": kitem["keyword"],
"v_name": kitem["v_name"], "v_name": kitem["v_name"],
@ -199,6 +204,7 @@ def integrate_data_parallel():
"batch": kitem["batch"], "batch": kitem["batch"],
"machine_id": MACHINE_ID, "machine_id": MACHINE_ID,
"level": kitem["level"], "level": kitem["level"],
"is_repeat": item['is_repeat']
}) })
DBSA.flush() DBSA.flush()
if rollback[0]: if rollback[0]: