Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
12915c13a5 |
96
DB.py
96
DB.py
@ -152,6 +152,7 @@ class DBVidcon:
|
||||
"password": "qwert@$123!&",
|
||||
"decode_responses": True,
|
||||
}
|
||||
VIDEO_HASH_KEY = "video_keys"
|
||||
|
||||
def __init__(self):
|
||||
self.l0_list_key = "video_l0_queue"
|
||||
@ -500,6 +501,39 @@ class DBVidcon:
|
||||
# 如果你存入的是 JSON 字符串,可以在这里做一次反序列化:
|
||||
return json.loads(item) if item is not None else None
|
||||
|
||||
# =========================
|
||||
|
||||
@redis_retry(max_retries=3)
|
||||
def cache_video_key(self, v_xid: str, rn: str):
|
||||
"""把 (v_xid,rn) 写进 Redis 哈希,幂等。"""
|
||||
if v_xid and rn:
|
||||
self.redis.hset(self.VIDEO_HASH_KEY, f"{v_xid}:{rn}", 1)
|
||||
|
||||
@redis_retry(max_retries=3)
|
||||
def cache_video_keys_bulk(self, pairs: List[tuple]):
|
||||
"""pairs = [(vxid, rn), …] 批量写入"""
|
||||
if not pairs:
|
||||
return
|
||||
mapping = {f"{vx}:{rn}": 1 for vx, rn in pairs if vx and rn}
|
||||
if mapping:
|
||||
self.redis.hset(self.VIDEO_HASH_KEY, mapping=mapping)
|
||||
|
||||
@redis_retry(max_retries=3)
|
||||
def video_key_exists(self, v_xid: str, rn: str) -> bool:
|
||||
"""Redis 判断 (vxid,rn) 是否已存在"""
|
||||
return bool(self.redis.hexists(self.VIDEO_HASH_KEY, f"{v_xid}:{rn}"))
|
||||
|
||||
@redis_retry(max_retries=3)
|
||||
def video_keys_exist_bulk(self, pairs: List[tuple]) -> set:
|
||||
"""
|
||||
返回已存在的键集合 {"vxid:rn", …}
|
||||
"""
|
||||
pipe = self.redis.pipeline()
|
||||
for vx, rn in pairs:
|
||||
pipe.hexists(self.VIDEO_HASH_KEY, f"{vx}:{rn}")
|
||||
flags = pipe.execute()
|
||||
return {f"{vx}:{rn}" for (vx, rn), flag in zip(pairs, flags) if flag}
|
||||
|
||||
|
||||
class DBSA:
|
||||
FLUSH_EVERY_ROWS = 100 # 行阈值
|
||||
@ -602,36 +636,6 @@ class DBSA:
|
||||
if not op_rows and not vid_rows:
|
||||
return
|
||||
|
||||
existing_keys = set()
|
||||
if vid_rows:
|
||||
# 收集 (v_xid, rn) 对,应与 video 表中的唯一索引匹配
|
||||
all_keys = list({(row["v_xid"], row["rn"]) for row in vid_rows})
|
||||
conn = _engine.connect()
|
||||
try:
|
||||
sel_vid = (
|
||||
video.select()
|
||||
.with_only_columns(video.c.v_xid, video.c.rn)
|
||||
.where(tuple_(video.c.v_xid, video.c.rn).in_(all_keys))
|
||||
)
|
||||
existing_keys = {(row.v_xid, row.rn) for row in conn.execute(sel_vid).fetchall()}
|
||||
except Exception as e:
|
||||
logger.info(f"[DBSA] 查询 video 表时异常: {e}")
|
||||
try:
|
||||
cls.push_record_many(payloads)
|
||||
except Exception as re:
|
||||
logger.info("[Redis 回退失败]", re)
|
||||
return
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
for i, vid_row in enumerate(vid_rows):
|
||||
key = (vid_row["v_xid"], vid_row["rn"])
|
||||
if key in existing_keys:
|
||||
op_rows[i]["is_repeat"] = 1
|
||||
else:
|
||||
op_rows[i]["is_repeat"] = 2
|
||||
|
||||
# 以下作者表、日志表和视频表写入逻辑保持不变...
|
||||
authors_map = {}
|
||||
now_ts = int(time.time())
|
||||
for data in payloads:
|
||||
@ -645,8 +649,6 @@ class DBSA:
|
||||
"u_pic": data.get("u_pic"),
|
||||
"follow_number": data.get("fans", 0),
|
||||
"v_number": data.get("videos", 0),
|
||||
"pv_number": 0,
|
||||
"b_number": 0,
|
||||
"create_time": datetime.utcnow(),
|
||||
"update_time": now_ts
|
||||
}
|
||||
@ -659,8 +661,6 @@ class DBSA:
|
||||
"u_pic": stmt_author.inserted.u_pic,
|
||||
"follow_number": stmt_author.inserted.follow_number,
|
||||
"v_number": stmt_author.inserted.v_number,
|
||||
"pv_number": stmt_author.inserted.pv_number,
|
||||
"b_number": stmt_author.inserted.b_number,
|
||||
"update_time": stmt_author.inserted.update_time,
|
||||
}
|
||||
ondup_author = stmt_author.on_duplicate_key_update(**upd_author)
|
||||
@ -686,13 +686,21 @@ class DBSA:
|
||||
logger.info("[Redis 回退失败]", re)
|
||||
return
|
||||
|
||||
for vid_row in vid_rows:
|
||||
# 保留 is_repeat 信息,后面用于判断新插入
|
||||
new_pairs = []
|
||||
for i, vid_row in enumerate(vid_rows):
|
||||
if op_rows[i].get("is_repeat") == 2:
|
||||
new_pairs.append((vid_row["v_xid"], vid_row["rn"]))
|
||||
vid_row.pop("is_repeat", None)
|
||||
vid_row.pop("level", None)
|
||||
|
||||
if vid_rows:
|
||||
try:
|
||||
cls._bulk_upsert(vid_rows)
|
||||
if new_pairs:
|
||||
db = DBVidcon()
|
||||
db.cache_video_keys_bulk(new_pairs)
|
||||
logger.info(f"[DBSA] 同步 {len(new_pairs)} 条新视频到 Redis video_keys")
|
||||
logger.info(
|
||||
f"[DBSA] flush 完成:authors={len(author_rows)} 条,op={len(op_rows)} 条,video={len(vid_rows)} 条"
|
||||
)
|
||||
@ -737,3 +745,21 @@ class DBSA:
|
||||
daemon=True
|
||||
)
|
||||
thread.start()
|
||||
|
||||
@classmethod
|
||||
def stream_video_keys(cls, chunk_size: int = 10_000):
|
||||
"""yield [(vxid, rn), …] 批次,纯 SQLAlchemy,无 Redis 依赖"""
|
||||
with _engine.connect() as conn:
|
||||
result = (
|
||||
conn.execution_options(stream_results=True)
|
||||
.execute(video.select()
|
||||
.with_only_columns(video.c.v_xid, video.c.rn))
|
||||
)
|
||||
batch = []
|
||||
for vxid, rn in result:
|
||||
batch.append((vxid, rn))
|
||||
if len(batch) >= chunk_size:
|
||||
yield batch
|
||||
batch = []
|
||||
if batch:
|
||||
yield batch
|
@ -1,6 +1,6 @@
|
||||
import json, time
|
||||
import argparse
|
||||
from DB import DBVidcon
|
||||
from DB import DBVidcon, DBSA
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(
|
||||
@ -14,8 +14,10 @@ def main():
|
||||
args = parse_args()
|
||||
batch = int(time.time())
|
||||
db = DBVidcon()
|
||||
push = None
|
||||
empty = None
|
||||
|
||||
for chunk in DBSA.stream_video_keys(chunk_size=10_000):
|
||||
db.cache_video_keys_bulk(chunk)
|
||||
print(f"同步Redis=={len(chunk)}")
|
||||
|
||||
if args.level == 0:
|
||||
push = db.push_l0
|
||||
|
24
main.py
24
main.py
@ -55,7 +55,7 @@ def format_duration(seconds):
|
||||
return "00:00"
|
||||
|
||||
|
||||
def get_searchInfo(keyword, level, headers, proxy_name, r=2):
|
||||
def get_searchInfo(keyword, level, rn, proxy_name, r=2):
|
||||
if r == 2:
|
||||
logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}")
|
||||
video_list = []
|
||||
@ -87,7 +87,7 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
|
||||
logger.exception(f"[Requested] 未知:{e}, keyword: {keyword}, l: {level}")
|
||||
else:
|
||||
time.sleep((3 - r) * 5)
|
||||
return get_searchInfo(keyword, level, headers, proxy_name, r - 1)
|
||||
return get_searchInfo(keyword, level, rn, proxy_name, r - 1)
|
||||
try:
|
||||
resinfo = jsondata.get("list")
|
||||
except Exception:
|
||||
@ -97,7 +97,7 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
|
||||
return None
|
||||
else:
|
||||
time.sleep((3 - r) * 5)
|
||||
return get_searchInfo(keyword, level, headers, proxy_name, r - 1)
|
||||
return get_searchInfo(keyword, level, rn, proxy_name, r - 1)
|
||||
for index, iteminfo in enumerate(resinfo):
|
||||
calculated_index = index + 1 + (j - 1) * limit
|
||||
xid = iteminfo["id"]
|
||||
@ -105,6 +105,10 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
|
||||
uxid = iteminfo["owner.id"]
|
||||
uid = base64.b64encode(f"Channel:{uxid}".encode('utf-8')).decode('utf-8')
|
||||
duration = iteminfo.get('duration')
|
||||
is_repeat = 0
|
||||
if db.video_key_exists(vid.strip(), rn):
|
||||
is_repeat = 1
|
||||
|
||||
if duration <= 300:
|
||||
continue
|
||||
v_data = {
|
||||
@ -122,10 +126,13 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
|
||||
"u_id": uid,
|
||||
"u_xid": uxid,
|
||||
"u_name": iteminfo.get('owner.screenname'),
|
||||
"u_pic": iteminfo.get('owner.avatar_60_url')
|
||||
"u_pic": iteminfo.get('owner.avatar_60_url'),
|
||||
"is_repeat": is_repeat,
|
||||
}
|
||||
video_list.append(v_data)
|
||||
time.sleep(3)
|
||||
time.sleep(3)
|
||||
if len(video_list) < 100:
|
||||
break
|
||||
return video_list
|
||||
|
||||
|
||||
@ -135,11 +142,11 @@ proxiesdict = db.get_proxy_agent_dict()
|
||||
def search_worker(payload, kitem, flag):
|
||||
try:
|
||||
gproxies = proxiesdict[kitem['rn']]
|
||||
v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies)
|
||||
v_list = get_searchInfo(kitem['keyword'], kitem['level'], kitem['rn'], gproxies)
|
||||
if not v_list:
|
||||
for i in range(2):
|
||||
time.sleep(i * 5)
|
||||
v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies)
|
||||
v_list = get_searchInfo(kitem['keyword'], kitem['level'], kitem['rn'], gproxies)
|
||||
if v_list:
|
||||
break
|
||||
time.sleep(2)
|
||||
@ -175,8 +182,6 @@ def integrate_data_parallel():
|
||||
continue
|
||||
|
||||
for item in v_list:
|
||||
if not v_list:
|
||||
continue
|
||||
DBSA.upsert_video({
|
||||
"keyword": kitem["keyword"],
|
||||
"v_name": kitem["v_name"],
|
||||
@ -199,6 +204,7 @@ def integrate_data_parallel():
|
||||
"batch": kitem["batch"],
|
||||
"machine_id": MACHINE_ID,
|
||||
"level": kitem["level"],
|
||||
"is_repeat": item['is_repeat']
|
||||
})
|
||||
DBSA.flush()
|
||||
if rollback[0]:
|
||||
|
Loading…
x
Reference in New Issue
Block a user