refactor: 优化数据库表定义和SQL语句格式,提高代码可读性

This commit is contained in:
晓丰 2025-07-17 10:53:37 +08:00
parent 4efccb5688
commit c922954d3a
2 changed files with 196 additions and 202 deletions

117
DB.py
View File

@ -116,7 +116,8 @@ def mysql_retry(max_retries: int = 3, base_delay: float = 1.0):
}.get(errno, f"MySQL错误{errno}")
wait = base_delay * (2 ** (attempt - 1))
logger.warning(f"[MySQL][{fn.__name__}] 第{attempt}次重试({errno} {reason}{e},等待 {wait:.1f}s...")
logger.warning(
f"[MySQL][{fn.__name__}] 第{attempt}次重试({errno} {reason}{e},等待 {wait:.1f}s...")
# 仅对断连类错误尝试重连
if errno in {2013, 2006}:
@ -128,8 +129,10 @@ def mysql_retry(max_retries: int = 3, base_delay: float = 1.0):
raise
return wrapper
return decorator
def redis_retry(max_retries: int = 3):
"""
装饰器工厂指定最大重试次数
@ -175,6 +178,7 @@ class DBVidcon:
self.l0_list_key = "video_l0_queue"
self.l1_list_key = "video_l1_queue"
self.l2_list_key = "video_l2_queue"
self.web_list_key = "video_web_queue"
self.report_list = "report_queue"
self.error_list_key = "error_save_queue"
self.conn = pymysql.connect(**self._MYSQL_CONF)
@ -257,6 +261,13 @@ class DBVidcon:
self.redis.lpush(self.l2_list_key, *raws)
logger.info(f"[写入l2] 已推入 {len(raws)}")
@redis_retry(max_retries=3)
def push_web(self, raws):
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.web_list_key, *raws)
logger.info(f"[写入web] 已推入 {len(raws)}")
@redis_retry(max_retries=3)
def push_report(self, raws):
"""原子操作:清空列表并写入新数据"""
@ -318,6 +329,17 @@ class DBVidcon:
items = []
return items
@redis_retry(max_retries=3)
def get_web_items(self, count: int = 1):
try:
items = self.fetch_from_redis(count, list_key=self.web_list_key)
except Exception as e:
logger.info("[Redis web pop error]", e)
self.reconnect_redis()
items = []
return items
@redis_retry(max_retries=3)
def item_keyword(self, count: int = 20):
try:
@ -373,14 +395,11 @@ class DBVidcon:
@mysql_retry()
def get_report_video(self):
sql = """
SELECT
id,
SELECT id,
name_title,
link
FROM
sh_dm_fight_records
WHERE
status = 1
FROM sh_dm_fight_records
WHERE status = 1 \
"""
self.cursor.execute(sql)
return self.cursor.fetchall()
@ -390,11 +409,10 @@ class DBVidcon:
sql = """
SELECT DISTINCT report_id
FROM sh_dm_fight_records
WHERE
status = 2
WHERE status = 2
AND subsequent_status = 1
AND report_time != ''
AND mid = %s
AND mid = %s \
"""
self.cursor.execute(sql, did)
return self.cursor.fetchall()
@ -402,13 +420,12 @@ class DBVidcon:
@mysql_retry()
def getreport_video(self):
sql = """
SELECT
id,
SELECT id,
v_xid
FROM
sh_dm_fight_records
WHERE
is_removed = '' or is_removed IS NULL or is_removed = 0
FROM sh_dm_fight_records
WHERE is_removed = ''
or is_removed IS NULL
or is_removed = 0 \
"""
self.cursor.execute(sql)
return self.cursor.fetchall()
@ -418,7 +435,7 @@ class DBVidcon:
sql = """
UPDATE sh_dm_fight_records
SET is_removed = %s
WHERE id = %s
WHERE id = %s \
"""
self.cursor.execute(sql, (removed_flag, d_id))
self.flush()
@ -465,9 +482,10 @@ class DBVidcon:
def update_video_ts_status(self):
sql = """
UPDATE sh_dm_video_v2 v
JOIN sh_dm_video_author a ON v.u_xid = a.u_xid
JOIN sh_dm_video_author a
ON v.u_xid = a.u_xid
SET v.ts_status = 3
WHERE a.white_status = 1;
WHERE a.white_status = 1; \
"""
self.cursor.execute(sql)
self.flush()
@ -488,37 +506,31 @@ class DBVidcon:
while True:
try:
sql_op = """
INSERT INTO sh_dm_video_op_v2 (
v_id, v_xid, a_id, level, name_title,
INSERT INTO sh_dm_video_op_v2 (v_id, v_xid, a_id, level, name_title,
keyword, rn, history_status, is_repeat,
sort, createtime, updatetime, batch, machine
) VALUES (
%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s,
sort, createtime, updatetime, batch, machine)
VALUES (%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s,
%(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s,
%(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s
)
%(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s) \
"""
self.cursor.execute(sql_op, data)
sql_update = """
INSERT INTO sh_dm_video_v2 (
v_id, v_xid, rn, v_name, title, link,
INSERT INTO sh_dm_video_v2 (v_id, v_xid, rn, v_name, title, link,
edition, duration,
public_time, cover_pic, sort,
u_xid, u_id, u_pic, u_name,
status, createtime, updatetime
) VALUES (
%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s,
status, createtime, updatetime)
VALUES (%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s,
'', %(duration)s,
%(create_time)s, %(cover_pic)s, %(sort)s,
%(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s,
1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP()
)
ON DUPLICATE KEY UPDATE
title = VALUES(title),
duration = VALUES(duration),
cover_pic = VALUES(cover_pic),
sort = VALUES(sort),
updatetime = UNIX_TIMESTAMP();
1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP()) ON DUPLICATE KEY
UPDATE
title =
VALUES (title), duration =
VALUES (duration), cover_pic =
VALUES (cover_pic), sort =
VALUES (sort), updatetime = UNIX_TIMESTAMP(); \
"""
self.cursor.execute(sql_update, data)
@ -545,32 +557,25 @@ class DBVidcon:
@mysql_retry()
def fetch_keyword_title(self, level: int = 99):
sql = """
SELECT
k.keyword,
SELECT k.keyword,
k.rn,
t.title AS v_name,
ANY_VALUE(t.level) AS level
FROM
sh_dm_keyword k
FROM sh_dm_keyword k
LEFT JOIN
sh_dm_title t ON k.title = t.title
WHERE
k.status = 1
WHERE k.status = 1
AND t.status = 1
AND NOT EXISTS (
SELECT 1
AND NOT EXISTS (SELECT 1
FROM sh_dm_black_keyword b
WHERE
(
WHERE (
(b.keyword IS NULL OR b.keyword = '') AND b.title = t.title
)
OR (
b.keyword IS NOT NULL AND b.keyword != '' AND b.keyword = k.keyword
)
)
))
AND t.level = %s
GROUP BY
k.keyword, k.rn;
GROUP BY k.keyword, k.rn; \
"""
self.cursor.execute(sql, (level,))
return self.cursor.fetchall()
@ -594,7 +599,7 @@ class DBVidcon:
sql = """
INSERT INTO sh_dm_batch_log
(batch, info, t0, t1, t2, starttime)
VALUES (%s, %s, %s, %s, %s, %s)
VALUES (%s, %s, %s, %s, %s, %s) \
"""
try:
self.cursor.execute(sql, (batch, level, t0, t1, t2, start_ts))
@ -666,6 +671,10 @@ class DBVidcon:
def l2_empty(self) -> bool:
return self.redis.llen(self.l2_list_key) == 0
@redis_retry(max_retries=3)
def web_empty(self) -> bool:
return self.redis.llen(self.web_list_key) == 0
@redis_retry(max_retries=3)
def pop_error_item(self):
"""

29
onoe.py
View File

@ -34,19 +34,7 @@ UserAgent = [
'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Maxthon 2.0)',
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Maxthon/5.0.3.4000 Chrome/47.0.2526.73 Safari/537.36',
'User-Agent, Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; The World)']
sec_ch_ua_list = [
'"Chromium";v="0", "Not;A=Brand";v="24", "Google Chrome";v="0"',
'"Chromium";v="0", "Not;A=Brand";v="24", "Google Chrome";v="0"',
'"Chromium";v="0", "Not;A=Brand";v="24", "Google Chrome";v="0"',
'"Chromium";v="0", "Not;A=Brand";v="24", "Google Chrome";v="0"',
'"Chromium";v="0", "Not;A=Brand";v="24", "Google Chrome";v="0"',
'"Chromium";v="136", "Not;A=Brand";v="24", "Google Chrome";v="136"',
'"Chromium";v="128", "Not;A=Brand";v="24", "Google Chrome";v="128"',
'"Chromium";v="53", "Not;A=Brand";v="24", "Google Chrome";v="53"',
'"Chromium";v="0", "Not;A=Brand";v="24", "Google Chrome";v="0"',
'"Chromium";v="47", "Not;A=Brand";v="24", "Google Chrome";v="47"',
'"Chromium";v="0", "Not;A=Brand";v="24", "Google Chrome";v="0"'
]
def get_part_ids(part_num: int, take: int, offset: int = 0):
part_ids = list(range(offset, offset + take))
@ -233,17 +221,14 @@ def gettoken(proxy, r=2):
}
try:
proxy_str = db.get_proxy(proxy)
logger.info(f"[代理] => {proxy_str}")
url = 'https://graphql.api.dailymotion.com/oauth/token'
response = requests.post(url, headers=headers, data=data, proxies={"http": proxy_str, "https": proxy_str})
token = response.json()['access_token']
copy_headers = copy.deepcopy(headers1)
uaidx = random.randint(0, len(UserAgent) - 1)
copy_headers['authorization'] = "Bearer " + token
copy_headers['x-dm-visit-id'] = str(int(time.time() * 1000))
copy_headers['x-dm-visitor-id'] = uuid_with_dash
copy_headers['User-Agent'] = UserAgent[uaidx]
copy_headers['sec-ch-ua'] = sec_ch_ua_list[uaidx]
copy_headers['User-Agent'] = UserAgent[random.randint(0, len(UserAgent) - 1)]
copy_headers['X-DM-Preferred-Country'] = proxy.lower()
with _cache_lock:
_headers_cache = copy_headers
@ -282,18 +267,18 @@ def solve_recaptcha_v3_with_proxy(
payload = {
"clientKey": "CAP-A76C932D4C6CCB3CA748F77FDC07D996",
"task": {
"type": "ReCaptchaV3TaskProxyLess",
"type": "ReCaptchaV3Task",
"websiteURL": f"https://www.dailymotion.com/search/{encoded_query}/top-results",
"websiteKey": "6LeOJBIrAAAAAPMIjyYvo-eN_9W1HDOkrEqHR8tM",
"pageAction": "search",
"pageAction": "___grecaptcha_cfg.clients['100000']['L']['L']['promise-callback'](gRecaptchaResponse)",
"minScore": 0.5
}
}
resp = requests.post(create_url, data=json.dumps(payload), headers=headers, timeout=30)
resp = requests.post(create_url, json=payload, headers=headers, timeout=30)
logger.info(f"[token] 发送 payload:{payload}")
resp.raise_for_status()
task_id = resp.json()["taskId"]
logger.info(f"task_id: {resp.text}")
logger.info(f"task_id: {task_id}")
# 轮询获取结果
check_payload = {"clientKey": "CAP-A76C932D4C6CCB3CA748F77FDC07D996", "taskId": task_id}
for i in range(max_poll_attempts):
@ -302,7 +287,7 @@ def solve_recaptcha_v3_with_proxy(
result = r.json()
logger.info(f"{i}次,task_id:{task_id},结果:{result}")
if result.get("status") == "ready":
return result["solution"]["gRecaptchaResponse"]
return result["solution"]["token"]
time.sleep(polling_interval)
raise TimeoutError(f"任务 {task_id} 在轮询 {max_poll_attempts} 次后未完成")