Merge remote-tracking branch 'origin/main'

# Conflicts:
#	DB.py
This commit is contained in:
晓丰 2025-07-17 10:54:26 +08:00
commit 5bb852ac5c
2 changed files with 183 additions and 192 deletions

119
DB.py
View File

@ -116,8 +116,7 @@ def mysql_retry(max_retries: int = 3, base_delay: float = 1.0):
}.get(errno, f"MySQL错误{errno}") }.get(errno, f"MySQL错误{errno}")
wait = base_delay * (2 ** (attempt - 1)) wait = base_delay * (2 ** (attempt - 1))
logger.warning( logger.warning(f"[MySQL][{fn.__name__}] 第{attempt}次重试({errno} {reason}{e},等待 {wait:.1f}s...")
f"[MySQL][{fn.__name__}] 第{attempt}次重试({errno} {reason}{e},等待 {wait:.1f}s...")
# 仅对断连类错误尝试重连 # 仅对断连类错误尝试重连
if errno in {2013, 2006}: if errno in {2013, 2006}:
@ -129,10 +128,8 @@ def mysql_retry(max_retries: int = 3, base_delay: float = 1.0):
raise raise
return wrapper return wrapper
return decorator return decorator
def redis_retry(max_retries: int = 3): def redis_retry(max_retries: int = 3):
""" """
装饰器工厂指定最大重试次数 装饰器工厂指定最大重试次数
@ -178,7 +175,6 @@ class DBVidcon:
self.l0_list_key = "video_l0_queue" self.l0_list_key = "video_l0_queue"
self.l1_list_key = "video_l1_queue" self.l1_list_key = "video_l1_queue"
self.l2_list_key = "video_l2_queue" self.l2_list_key = "video_l2_queue"
self.web_list_key = "video_web_queue"
self.report_list = "report_queue" self.report_list = "report_queue"
self.error_list_key = "error_save_queue" self.error_list_key = "error_save_queue"
self.conn = pymysql.connect(**self._MYSQL_CONF) self.conn = pymysql.connect(**self._MYSQL_CONF)
@ -261,13 +257,6 @@ class DBVidcon:
self.redis.lpush(self.l2_list_key, *raws) self.redis.lpush(self.l2_list_key, *raws)
logger.info(f"[写入l2] 已推入 {len(raws)}") logger.info(f"[写入l2] 已推入 {len(raws)}")
@redis_retry(max_retries=3)
def push_web(self, raws):
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.web_list_key, *raws)
logger.info(f"[写入web] 已推入 {len(raws)}")
@redis_retry(max_retries=3) @redis_retry(max_retries=3)
def push_report(self, raws): def push_report(self, raws):
"""原子操作:清空列表并写入新数据""" """原子操作:清空列表并写入新数据"""
@ -329,17 +318,6 @@ class DBVidcon:
items = [] items = []
return items return items
@redis_retry(max_retries=3)
def get_web_items(self, count: int = 1):
try:
items = self.fetch_from_redis(count, list_key=self.web_list_key)
except Exception as e:
logger.info("[Redis web pop error]", e)
self.reconnect_redis()
items = []
return items
@redis_retry(max_retries=3) @redis_retry(max_retries=3)
def item_keyword(self, count: int = 20): def item_keyword(self, count: int = 20):
try: try:
@ -395,11 +373,14 @@ class DBVidcon:
@mysql_retry() @mysql_retry()
def get_report_video(self): def get_report_video(self):
sql = """ sql = """
SELECT id, SELECT
id,
name_title, name_title,
link link
FROM sh_dm_fight_records FROM
WHERE status = 1 \ sh_dm_fight_records
WHERE
status = 1
""" """
self.cursor.execute(sql) self.cursor.execute(sql)
return self.cursor.fetchall() return self.cursor.fetchall()
@ -409,10 +390,11 @@ class DBVidcon:
sql = """ sql = """
SELECT DISTINCT report_id SELECT DISTINCT report_id
FROM sh_dm_fight_records FROM sh_dm_fight_records
WHERE status = 2 WHERE
AND subsequent_status = 1 status = 2
AND subsequent_status in (1, 3)
AND report_time != '' AND report_time != ''
AND mid = %s \ AND mid = %s
""" """
self.cursor.execute(sql,did) self.cursor.execute(sql,did)
return self.cursor.fetchall() return self.cursor.fetchall()
@ -420,12 +402,13 @@ class DBVidcon:
@mysql_retry() @mysql_retry()
def getreport_video(self): def getreport_video(self):
sql = """ sql = """
SELECT id, SELECT
id,
v_xid v_xid
FROM sh_dm_fight_records FROM
WHERE is_removed = '' sh_dm_fight_records
or is_removed IS NULL WHERE
or is_removed = 0 \ is_removed = '' or is_removed IS NULL or is_removed = 0
""" """
self.cursor.execute(sql) self.cursor.execute(sql)
return self.cursor.fetchall() return self.cursor.fetchall()
@ -435,7 +418,7 @@ class DBVidcon:
sql = """ sql = """
UPDATE sh_dm_fight_records UPDATE sh_dm_fight_records
SET is_removed = %s SET is_removed = %s
WHERE id = %s \ WHERE id = %s
""" """
self.cursor.execute(sql, (removed_flag, d_id)) self.cursor.execute(sql, (removed_flag, d_id))
self.flush() self.flush()
@ -482,10 +465,9 @@ class DBVidcon:
def update_video_ts_status(self): def update_video_ts_status(self):
sql = """ sql = """
UPDATE sh_dm_video_v2 v UPDATE sh_dm_video_v2 v
JOIN sh_dm_video_author a JOIN sh_dm_video_author a ON v.u_xid = a.u_xid
ON v.u_xid = a.u_xid
SET v.ts_status = 3 SET v.ts_status = 3
WHERE a.white_status = 1; \ WHERE a.white_status = 1;
""" """
self.cursor.execute(sql) self.cursor.execute(sql)
self.flush() self.flush()
@ -506,31 +488,37 @@ class DBVidcon:
while True: while True:
try: try:
sql_op = """ sql_op = """
INSERT INTO sh_dm_video_op_v2 (v_id, v_xid, a_id, level, name_title, INSERT INTO sh_dm_video_op_v2 (
v_id, v_xid, a_id, level, name_title,
keyword, rn, history_status, is_repeat, keyword, rn, history_status, is_repeat,
sort, createtime, updatetime, batch, machine) sort, createtime, updatetime, batch, machine
VALUES (%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s, ) VALUES (
%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s,
%(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s, %(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s,
%(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s) \ %(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s
)
""" """
self.cursor.execute(sql_op, data) self.cursor.execute(sql_op, data)
sql_update = """ sql_update = """
INSERT INTO sh_dm_video_v2 (v_id, v_xid, rn, v_name, title, link, INSERT INTO sh_dm_video_v2 (
v_id, v_xid, rn, v_name, title, link,
edition, duration, edition, duration,
public_time, cover_pic, sort, public_time, cover_pic, sort,
u_xid, u_id, u_pic, u_name, u_xid, u_id, u_pic, u_name,
status, createtime, updatetime) status, createtime, updatetime
VALUES (%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s, ) VALUES (
%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s,
'', %(duration)s, '', %(duration)s,
%(create_time)s, %(cover_pic)s, %(sort)s, %(create_time)s, %(cover_pic)s, %(sort)s,
%(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s, %(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s,
1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP()) ON DUPLICATE KEY 1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP()
UPDATE )
title = ON DUPLICATE KEY UPDATE
VALUES (title), duration = title = VALUES(title),
VALUES (duration), cover_pic = duration = VALUES(duration),
VALUES (cover_pic), sort = cover_pic = VALUES(cover_pic),
VALUES (sort), updatetime = UNIX_TIMESTAMP(); \ sort = VALUES(sort),
updatetime = UNIX_TIMESTAMP();
""" """
self.cursor.execute(sql_update, data) self.cursor.execute(sql_update, data)
@ -557,25 +545,32 @@ class DBVidcon:
@mysql_retry() @mysql_retry()
def fetch_keyword_title(self, level: int = 99): def fetch_keyword_title(self, level: int = 99):
sql = """ sql = """
SELECT k.keyword, SELECT
k.keyword,
k.rn, k.rn,
t.title AS v_name, t.title AS v_name,
ANY_VALUE(t.level) AS level ANY_VALUE(t.level) AS level
FROM sh_dm_keyword k FROM
sh_dm_keyword k
LEFT JOIN LEFT JOIN
sh_dm_title t ON k.title = t.title sh_dm_title t ON k.title = t.title
WHERE k.status = 1 WHERE
k.status = 1
AND t.status = 1 AND t.status = 1
AND NOT EXISTS (SELECT 1 AND NOT EXISTS (
SELECT 1
FROM sh_dm_black_keyword b FROM sh_dm_black_keyword b
WHERE ( WHERE
(
(b.keyword IS NULL OR b.keyword = '') AND b.title = t.title (b.keyword IS NULL OR b.keyword = '') AND b.title = t.title
) )
OR ( OR (
b.keyword IS NOT NULL AND b.keyword != '' AND b.keyword = k.keyword b.keyword IS NOT NULL AND b.keyword != '' AND b.keyword = k.keyword
)) )
)
AND t.level = %s AND t.level = %s
GROUP BY k.keyword, k.rn; \ GROUP BY
k.keyword, k.rn;
""" """
self.cursor.execute(sql, (level,)) self.cursor.execute(sql, (level,))
return self.cursor.fetchall() return self.cursor.fetchall()
@ -599,7 +594,7 @@ class DBVidcon:
sql = """ sql = """
INSERT INTO sh_dm_batch_log INSERT INTO sh_dm_batch_log
(batch, info, t0, t1, t2, starttime) (batch, info, t0, t1, t2, starttime)
VALUES (%s, %s, %s, %s, %s, %s) \ VALUES (%s, %s, %s, %s, %s, %s)
""" """
try: try:
self.cursor.execute(sql, (batch, level, t0, t1, t2, start_ts)) self.cursor.execute(sql, (batch, level, t0, t1, t2, start_ts))
@ -671,10 +666,6 @@ class DBVidcon:
def l2_empty(self) -> bool: def l2_empty(self) -> bool:
return self.redis.llen(self.l2_list_key) == 0 return self.redis.llen(self.l2_list_key) == 0
@redis_retry(max_retries=3)
def web_empty(self) -> bool:
return self.redis.llen(self.web_list_key) == 0
@redis_retry(max_retries=3) @redis_retry(max_retries=3)
def pop_error_item(self): def pop_error_item(self):
""" """

View File

@ -59,11 +59,11 @@ def get_searchInfo(keyword, level, headers, proxy_name, r=2):
if r == 2: if r == 2:
logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}") logger.info(f"NET处理->{keyword},\trn->{proxy_name},\tlevel->{level}")
video_list = [] video_list = []
max_page = 2 max_page = 3
limit = 30 limit = 100
endpoint = 'https://api.dailymotion.com/videos' endpoint = 'https://api.dailymotion.com/videos'
if level == 0 or level == 1: if level == 0 or level == 1:
max_page = 3 max_page = 4
limit = 100 limit = 100
for j in range(1, max_page): for j in range(1, max_page):
params = { params = {