Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
12915c13a5 |
96
DB.py
96
DB.py
@ -152,6 +152,7 @@ class DBVidcon:
|
|||||||
"password": "qwert@$123!&",
|
"password": "qwert@$123!&",
|
||||||
"decode_responses": True,
|
"decode_responses": True,
|
||||||
}
|
}
|
||||||
|
VIDEO_HASH_KEY = "video_keys"
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.l0_list_key = "video_l0_queue"
|
self.l0_list_key = "video_l0_queue"
|
||||||
@ -500,6 +501,39 @@ class DBVidcon:
|
|||||||
# 如果你存入的是 JSON 字符串,可以在这里做一次反序列化:
|
# 如果你存入的是 JSON 字符串,可以在这里做一次反序列化:
|
||||||
return json.loads(item) if item is not None else None
|
return json.loads(item) if item is not None else None
|
||||||
|
|
||||||
|
# =========================
|
||||||
|
|
||||||
|
@redis_retry(max_retries=3)
|
||||||
|
def cache_video_key(self, v_xid: str, rn: str):
|
||||||
|
"""把 (v_xid,rn) 写进 Redis 哈希,幂等。"""
|
||||||
|
if v_xid and rn:
|
||||||
|
self.redis.hset(self.VIDEO_HASH_KEY, f"{v_xid}:{rn}", 1)
|
||||||
|
|
||||||
|
@redis_retry(max_retries=3)
|
||||||
|
def cache_video_keys_bulk(self, pairs: List[tuple]):
|
||||||
|
"""pairs = [(vxid, rn), …] 批量写入"""
|
||||||
|
if not pairs:
|
||||||
|
return
|
||||||
|
mapping = {f"{vx}:{rn}": 1 for vx, rn in pairs if vx and rn}
|
||||||
|
if mapping:
|
||||||
|
self.redis.hset(self.VIDEO_HASH_KEY, mapping=mapping)
|
||||||
|
|
||||||
|
@redis_retry(max_retries=3)
|
||||||
|
def video_key_exists(self, v_xid: str, rn: str) -> bool:
|
||||||
|
"""Redis 判断 (vxid,rn) 是否已存在"""
|
||||||
|
return bool(self.redis.hexists(self.VIDEO_HASH_KEY, f"{v_xid}:{rn}"))
|
||||||
|
|
||||||
|
@redis_retry(max_retries=3)
|
||||||
|
def video_keys_exist_bulk(self, pairs: List[tuple]) -> set:
|
||||||
|
"""
|
||||||
|
返回已存在的键集合 {"vxid:rn", …}
|
||||||
|
"""
|
||||||
|
pipe = self.redis.pipeline()
|
||||||
|
for vx, rn in pairs:
|
||||||
|
pipe.hexists(self.VIDEO_HASH_KEY, f"{vx}:{rn}")
|
||||||
|
flags = pipe.execute()
|
||||||
|
return {f"{vx}:{rn}" for (vx, rn), flag in zip(pairs, flags) if flag}
|
||||||
|
|
||||||
|
|
||||||
class DBSA:
|
class DBSA:
|
||||||
FLUSH_EVERY_ROWS = 100 # 行阈值
|
FLUSH_EVERY_ROWS = 100 # 行阈值
|
||||||
@ -602,36 +636,6 @@ class DBSA:
|
|||||||
if not op_rows and not vid_rows:
|
if not op_rows and not vid_rows:
|
||||||
return
|
return
|
||||||
|
|
||||||
existing_keys = set()
|
|
||||||
if vid_rows:
|
|
||||||
# 收集 (v_xid, rn) 对,应与 video 表中的唯一索引匹配
|
|
||||||
all_keys = list({(row["v_xid"], row["rn"]) for row in vid_rows})
|
|
||||||
conn = _engine.connect()
|
|
||||||
try:
|
|
||||||
sel_vid = (
|
|
||||||
video.select()
|
|
||||||
.with_only_columns(video.c.v_xid, video.c.rn)
|
|
||||||
.where(tuple_(video.c.v_xid, video.c.rn).in_(all_keys))
|
|
||||||
)
|
|
||||||
existing_keys = {(row.v_xid, row.rn) for row in conn.execute(sel_vid).fetchall()}
|
|
||||||
except Exception as e:
|
|
||||||
logger.info(f"[DBSA] 查询 video 表时异常: {e}")
|
|
||||||
try:
|
|
||||||
cls.push_record_many(payloads)
|
|
||||||
except Exception as re:
|
|
||||||
logger.info("[Redis 回退失败]", re)
|
|
||||||
return
|
|
||||||
finally:
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
for i, vid_row in enumerate(vid_rows):
|
|
||||||
key = (vid_row["v_xid"], vid_row["rn"])
|
|
||||||
if key in existing_keys:
|
|
||||||
op_rows[i]["is_repeat"] = 1
|
|
||||||
else:
|
|
||||||
op_rows[i]["is_repeat"] = 2
|
|
||||||
|
|
||||||
# 以下作者表、日志表和视频表写入逻辑保持不变...
|
|
||||||
authors_map = {}
|
authors_map = {}
|
||||||
now_ts = int(time.time())
|
now_ts = int(time.time())
|
||||||
for data in payloads:
|
for data in payloads:
|
||||||
@ -645,8 +649,6 @@ class DBSA:
|
|||||||
"u_pic": data.get("u_pic"),
|
"u_pic": data.get("u_pic"),
|
||||||
"follow_number": data.get("fans", 0),
|
"follow_number": data.get("fans", 0),
|
||||||
"v_number": data.get("videos", 0),
|
"v_number": data.get("videos", 0),
|
||||||
"pv_number": 0,
|
|
||||||
"b_number": 0,
|
|
||||||
"create_time": datetime.utcnow(),
|
"create_time": datetime.utcnow(),
|
||||||
"update_time": now_ts
|
"update_time": now_ts
|
||||||
}
|
}
|
||||||
@ -659,8 +661,6 @@ class DBSA:
|
|||||||
"u_pic": stmt_author.inserted.u_pic,
|
"u_pic": stmt_author.inserted.u_pic,
|
||||||
"follow_number": stmt_author.inserted.follow_number,
|
"follow_number": stmt_author.inserted.follow_number,
|
||||||
"v_number": stmt_author.inserted.v_number,
|
"v_number": stmt_author.inserted.v_number,
|
||||||
"pv_number": stmt_author.inserted.pv_number,
|
|
||||||
"b_number": stmt_author.inserted.b_number,
|
|
||||||
"update_time": stmt_author.inserted.update_time,
|
"update_time": stmt_author.inserted.update_time,
|
||||||
}
|
}
|
||||||
ondup_author = stmt_author.on_duplicate_key_update(**upd_author)
|
ondup_author = stmt_author.on_duplicate_key_update(**upd_author)
|
||||||
@ -686,13 +686,21 @@ class DBSA:
|
|||||||
logger.info("[Redis 回退失败]", re)
|
logger.info("[Redis 回退失败]", re)
|
||||||
return
|
return
|
||||||
|
|
||||||
for vid_row in vid_rows:
|
# 保留 is_repeat 信息,后面用于判断新插入
|
||||||
|
new_pairs = []
|
||||||
|
for i, vid_row in enumerate(vid_rows):
|
||||||
|
if op_rows[i].get("is_repeat") == 2:
|
||||||
|
new_pairs.append((vid_row["v_xid"], vid_row["rn"]))
|
||||||
vid_row.pop("is_repeat", None)
|
vid_row.pop("is_repeat", None)
|
||||||
vid_row.pop("level", None)
|
vid_row.pop("level", None)
|
||||||
|
|
||||||
if vid_rows:
|
if vid_rows:
|
||||||
try:
|
try:
|
||||||
cls._bulk_upsert(vid_rows)
|
cls._bulk_upsert(vid_rows)
|
||||||
|
if new_pairs:
|
||||||
|
db = DBVidcon()
|
||||||
|
db.cache_video_keys_bulk(new_pairs)
|
||||||
|
logger.info(f"[DBSA] 同步 {len(new_pairs)} 条新视频到 Redis video_keys")
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[DBSA] flush 完成:authors={len(author_rows)} 条,op={len(op_rows)} 条,video={len(vid_rows)} 条"
|
f"[DBSA] flush 完成:authors={len(author_rows)} 条,op={len(op_rows)} 条,video={len(vid_rows)} 条"
|
||||||
)
|
)
|
||||||
@ -737,3 +745,21 @@ class DBSA:
|
|||||||
daemon=True
|
daemon=True
|
||||||
)
|
)
|
||||||
thread.start()
|
thread.start()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def stream_video_keys(cls, chunk_size: int = 10_000):
|
||||||
|
"""yield [(vxid, rn), …] 批次,纯 SQLAlchemy,无 Redis 依赖"""
|
||||||
|
with _engine.connect() as conn:
|
||||||
|
result = (
|
||||||
|
conn.execution_options(stream_results=True)
|
||||||
|
.execute(video.select()
|
||||||
|
.with_only_columns(video.c.v_xid, video.c.rn))
|
||||||
|
)
|
||||||
|
batch = []
|
||||||
|
for vxid, rn in result:
|
||||||
|
batch.append((vxid, rn))
|
||||||
|
if len(batch) >= chunk_size:
|
||||||
|
yield batch
|
||||||
|
batch = []
|
||||||
|
if batch:
|
||||||
|
yield batch
|
@ -1,6 +1,6 @@
|
|||||||
import json, time
|
import json, time
|
||||||
import argparse
|
import argparse
|
||||||
from DB import DBVidcon
|
from DB import DBVidcon, DBSA
|
||||||
|
|
||||||
def parse_args():
|
def parse_args():
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
@ -14,8 +14,10 @@ def main():
|
|||||||
args = parse_args()
|
args = parse_args()
|
||||||
batch = int(time.time())
|
batch = int(time.time())
|
||||||
db = DBVidcon()
|
db = DBVidcon()
|
||||||
push = None
|
|
||||||
empty = None
|
for chunk in DBSA.stream_video_keys(chunk_size=10_000):
|
||||||
|
db.cache_video_keys_bulk(chunk)
|
||||||
|
print(f"同步Redis=={len(chunk)}")
|
||||||
|
|
||||||
if args.level == 0:
|
if args.level == 0:
|
||||||
push = db.push_l0
|
push = db.push_l0
|
||||||
|
24
main.py
24
main.py
@ -55,7 +55,7 @@ def format_duration(seconds):
|
|||||||
return "00:00"
|
return "00:00"
|
||||||
|
|
||||||
|
|
||||||
def get_searchInfo(keyword, level, headers, proxy_name, r=2):
|
def get_searchInfo(keyword, level, rn, proxy_name, r=2):
|
||||||
if r == 2:
|
if r == 2:
|
||||||
logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}")
|
logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}")
|
||||||
video_list = []
|
video_list = []
|
||||||
@ -87,7 +87,7 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
|
|||||||
logger.exception(f"[Requested] 未知:{e}, keyword: {keyword}, l: {level}")
|
logger.exception(f"[Requested] 未知:{e}, keyword: {keyword}, l: {level}")
|
||||||
else:
|
else:
|
||||||
time.sleep((3 - r) * 5)
|
time.sleep((3 - r) * 5)
|
||||||
return get_searchInfo(keyword, level, headers, proxy_name, r - 1)
|
return get_searchInfo(keyword, level, rn, proxy_name, r - 1)
|
||||||
try:
|
try:
|
||||||
resinfo = jsondata.get("list")
|
resinfo = jsondata.get("list")
|
||||||
except Exception:
|
except Exception:
|
||||||
@ -97,7 +97,7 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
|
|||||||
return None
|
return None
|
||||||
else:
|
else:
|
||||||
time.sleep((3 - r) * 5)
|
time.sleep((3 - r) * 5)
|
||||||
return get_searchInfo(keyword, level, headers, proxy_name, r - 1)
|
return get_searchInfo(keyword, level, rn, proxy_name, r - 1)
|
||||||
for index, iteminfo in enumerate(resinfo):
|
for index, iteminfo in enumerate(resinfo):
|
||||||
calculated_index = index + 1 + (j - 1) * limit
|
calculated_index = index + 1 + (j - 1) * limit
|
||||||
xid = iteminfo["id"]
|
xid = iteminfo["id"]
|
||||||
@ -105,6 +105,10 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
|
|||||||
uxid = iteminfo["owner.id"]
|
uxid = iteminfo["owner.id"]
|
||||||
uid = base64.b64encode(f"Channel:{uxid}".encode('utf-8')).decode('utf-8')
|
uid = base64.b64encode(f"Channel:{uxid}".encode('utf-8')).decode('utf-8')
|
||||||
duration = iteminfo.get('duration')
|
duration = iteminfo.get('duration')
|
||||||
|
is_repeat = 0
|
||||||
|
if db.video_key_exists(vid.strip(), rn):
|
||||||
|
is_repeat = 1
|
||||||
|
|
||||||
if duration <= 300:
|
if duration <= 300:
|
||||||
continue
|
continue
|
||||||
v_data = {
|
v_data = {
|
||||||
@ -122,10 +126,13 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
|
|||||||
"u_id": uid,
|
"u_id": uid,
|
||||||
"u_xid": uxid,
|
"u_xid": uxid,
|
||||||
"u_name": iteminfo.get('owner.screenname'),
|
"u_name": iteminfo.get('owner.screenname'),
|
||||||
"u_pic": iteminfo.get('owner.avatar_60_url')
|
"u_pic": iteminfo.get('owner.avatar_60_url'),
|
||||||
|
"is_repeat": is_repeat,
|
||||||
}
|
}
|
||||||
video_list.append(v_data)
|
video_list.append(v_data)
|
||||||
time.sleep(3)
|
time.sleep(3)
|
||||||
|
if len(video_list) < 100:
|
||||||
|
break
|
||||||
return video_list
|
return video_list
|
||||||
|
|
||||||
|
|
||||||
@ -135,11 +142,11 @@ proxiesdict = db.get_proxy_agent_dict()
|
|||||||
def search_worker(payload, kitem, flag):
|
def search_worker(payload, kitem, flag):
|
||||||
try:
|
try:
|
||||||
gproxies = proxiesdict[kitem['rn']]
|
gproxies = proxiesdict[kitem['rn']]
|
||||||
v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies)
|
v_list = get_searchInfo(kitem['keyword'], kitem['level'], kitem['rn'], gproxies)
|
||||||
if not v_list:
|
if not v_list:
|
||||||
for i in range(2):
|
for i in range(2):
|
||||||
time.sleep(i * 5)
|
time.sleep(i * 5)
|
||||||
v_list = get_searchInfo(kitem['keyword'], kitem['level'], None, gproxies)
|
v_list = get_searchInfo(kitem['keyword'], kitem['level'], kitem['rn'], gproxies)
|
||||||
if v_list:
|
if v_list:
|
||||||
break
|
break
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
@ -175,8 +182,6 @@ def integrate_data_parallel():
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
for item in v_list:
|
for item in v_list:
|
||||||
if not v_list:
|
|
||||||
continue
|
|
||||||
DBSA.upsert_video({
|
DBSA.upsert_video({
|
||||||
"keyword": kitem["keyword"],
|
"keyword": kitem["keyword"],
|
||||||
"v_name": kitem["v_name"],
|
"v_name": kitem["v_name"],
|
||||||
@ -199,6 +204,7 @@ def integrate_data_parallel():
|
|||||||
"batch": kitem["batch"],
|
"batch": kitem["batch"],
|
||||||
"machine_id": MACHINE_ID,
|
"machine_id": MACHINE_ID,
|
||||||
"level": kitem["level"],
|
"level": kitem["level"],
|
||||||
|
"is_repeat": item['is_repeat']
|
||||||
})
|
})
|
||||||
DBSA.flush()
|
DBSA.flush()
|
||||||
if rollback[0]:
|
if rollback[0]:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user