feat: 更新视频插入逻辑以包括is_repeat字段并改进代理池为空时的阻塞处理

This commit is contained in:
晓丰 2025-05-17 00:33:55 +08:00
parent 12bbd2340b
commit dd90cc3c91
4 changed files with 73 additions and 72 deletions

82
DB.py
View File

@ -146,55 +146,67 @@ class DBVidcon:
print(f"[回滚] 已退回 {len(payloads)}") print(f"[回滚] 已退回 {len(payloads)}")
def upsert_video(self, data: dict): def upsert_video(self, data: dict):
"""
1) 插入到 sh_dm_video_op_v2
2) DELETE sh_dm_video_v2 WHERE rn = AND v_xid =
3) INSERT INTO sh_dm_video_v2 () VALUES ()
"""
# 保底字段
data.setdefault("a_id", 0) data.setdefault("a_id", 0)
data.setdefault("history_status", "") data.setdefault("history_status", "")
data.setdefault("is_repeat", 3) data.setdefault("is_repeat", 3)
data["sort"] = data.get("index", 0) data["sort"] = data.get("index", 0)
try: try:
select_repeat = """
SELECT is_repeat
FROM sh_dm_video_v2
WHERE rn = %(rn)s
AND v_xid = %(v_xid)s
LIMIT 1
"""
self.cursor.execute(select_repeat, data)
row = self.cursor.fetchone()
if row:
data['is_repeat'] = row[0]
# 2. 插入到 op 表
sql_op = """ sql_op = """
INSERT INTO sh_dm_video_op_v2 ( INSERT INTO sh_dm_video_op_v2 (
v_id, v_xid, a_id, level, name_title, v_id, v_xid, a_id, level, name_title,
keyword, rn, history_status, is_repeat, keyword, rn, history_status, is_repeat,
sort, createtime, updatetime, batch, machine sort, createtime, updatetime, batch, machine
) VALUES ( ) VALUES (
%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, %(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s,
%(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s,
%(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s
) )
""" """
self.cursor.execute(sql_op, data) self.cursor.execute(sql_op, data)
# 3. 删除旧表中的那行
sql_del = """ sql_del = """
DELETE FROM sh_dm_video_v2 DELETE FROM sh_dm_video_v2
WHERE rn = %(rn)s WHERE rn = %(rn)s
AND v_xid = %(v_xid)s AND v_xid = %(v_xid)s
""" """
self.cursor.execute(sql_del, data) self.cursor.execute(sql_del, data)
# 4. 带上 is_repeat 再插入新数据
sql_ins = """ sql_ins = """
INSERT INTO sh_dm_video_v2 ( INSERT INTO sh_dm_video_v2 (
v_id, v_xid, rn, v_name, title, link, v_id, v_xid, rn, v_name, title, link,
is_piracy, edition, duration, is_piracy, edition, duration,
watch_number, follow_number, video_number, watch_number, follow_number, video_number,
public_time, cover_pic, sort, public_time, cover_pic, sort,
u_xid, u_id, u_pic, u_name, u_xid, u_id, u_pic, u_name,
status, createtime, updatetime status, createtime, updatetime,
) VALUES ( is_repeat
%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, ) VALUES (
3, '', %(duration)s, %(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s,
%(watch_number)s, %(fans)s, %(videos)s, %(is_piracy)s, '', %(duration)s,
%(create_time)s, %(cover_pic)s, %(sort)s, %(watch_number)s, %(fans)s, %(videos)s,
%(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, %(create_time)s, %(cover_pic)s, %(sort)s,
1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP() %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s,
) 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(),
%(is_repeat)s
)
""" """
self.cursor.execute(sql_ins, data) self.cursor.execute(sql_ins, data)
except Exception as e: except Exception as e:
# 打印错误并回滚 # 打印错误并回滚
print("[数据库写入异常]", str(e)) print("[数据库写入异常]", str(e))
@ -212,7 +224,7 @@ class DBVidcon:
def get_proxy(self, region_code: str) -> str: def get_proxy(self, region_code: str) -> str:
""" """
Redis 队列 proxy_queue:<region_code> 弹出一个代理并返回 Redis 队列 proxy_queue:<region_code> 弹出一个代理并返回
如果队列为空返回空字符串 如果队列为空阻塞
""" """
proxy = "" proxy = ""
while True: while True:

View File

@ -15,12 +15,12 @@ proxies_address = {
"印度尼西亚": "ID", "印度尼西亚": "ID",
"马来": "MY", "马来": "MY",
"加拿大": "CA", "加拿大": "CA",
"台湾": "TW", #"CN_city_TW", "台湾": "CN_city_TW", # "TW", #
"泰国": "TH", "泰国": "TH",
"美国": "US", "美国": "US",
"西班牙": "ES", "西班牙": "ES",
"韩国": "KR", "韩国": "KR",
"香港": "HK", #"CN_city_HK", "香港": "CN_city_HK", # "HK", #
"越南": "VN", "越南": "VN",
} }
MACHINE_ID = None MACHINE_ID = None
@ -137,6 +137,7 @@ def post_with_retry(url, json_payload=None, data=None, headers=None, proxies=Non
for attempt in range(1, retries + 1): for attempt in range(1, retries + 1):
try: try:
proxy_str = db.get_proxy(Gproxies) proxy_str = db.get_proxy(Gproxies)
proxies = {"http": proxy_str, "https": proxy_str} proxies = {"http": proxy_str, "https": proxy_str}
resp = requests.post( resp = requests.post(

View File

@ -28,12 +28,12 @@ PROXIES_ADDRESS = {
"印度尼西亚": "ID", "印度尼西亚": "ID",
"马来": "MY", "马来": "MY",
"加拿大": "CA", "加拿大": "CA",
"台湾": "TW", #"CN_city_TW", "台湾": "CN_city_TW", #"TW", #
"泰国": "TH", "泰国": "TH",
"美国": "US", "美国": "US",
"西班牙": "ES", "西班牙": "ES",
"韩国": "KR", "韩国": "KR",
"香港": "HK", #"CN_city_HK", "香港": "CN_city_HK", #"HK", #
"越南": "VN", "越南": "VN",
} }

View File

@ -25,40 +25,28 @@ def get_data_for_rn(rn: str) -> pd.DataFrame:
# 注意:这里把 SQL 中的 rn 和 level 参数化 # 注意:这里把 SQL 中的 rn 和 level 参数化
sql = f""" sql = f"""
SELECT SELECT
op.id AS ID, op.id AS ID,
v.v_name AS 片名, v.v_name AS 片名,
v.link AS 视频连接, v.link AS 视频连接,
v.is_piracy AS 是否盗版, v.is_piracy AS 是否盗版,
op.`level` AS 优先级, op.`level` AS 优先级,
op.rn AS 地区, op.rn AS 地区,
NULL AS 投诉日期, NULL AS 投诉日期,
NULL AS 下线日期, NULL AS 下线日期,
op.keyword AS 关键词, op.keyword AS 关键词,
v.title AS 标题, v.title AS 标题,
v.duration AS 时长, v.duration AS 时长,
v.watch_number AS 观看数量, v.watch_number AS 观看数量,
v.public_time AS 上传时间, v.public_time AS 上传时间,
v.u_pic AS 头像, v.u_pic AS 头像,
CASE v.is_repeat AS 是否重复, -- 直接用字段
WHEN dup.cnt > 1 THEN 1 op.sort AS 排序,
ELSE 2 op.batch AS 批次,
END AS 是否重复, op.machine AS 机器号,
op.sort AS 排序, v.u_id AS 用户id,
op.batch AS 批次, v.u_xid AS u_xid,
op.machine AS 机器号, v.u_name AS 用户名称
v.u_id AS 用户id,
v.u_xid AS u_xid,
v.u_name AS 用户名称
FROM sh_dm_video_op_v2 AS op FROM sh_dm_video_op_v2 AS op
LEFT JOIN (
SELECT
t.v_xid,
COUNT(*) AS cnt
FROM sh_dm_video_op_v2 AS t
WHERE t.batch IN (1747324254, 1747323990)
GROUP BY t.v_xid
) AS dup
ON op.v_xid = dup.v_xid
LEFT JOIN sh_dm_video_v2 AS v LEFT JOIN sh_dm_video_v2 AS v
ON op.v_xid = v.v_xid ON op.v_xid = v.v_xid
WHERE op.rn = %s WHERE op.rn = %s