1029 lines
36 KiB
Python
1029 lines
36 KiB
Python
import json
|
||
import redis
|
||
import pymysql
|
||
import functools
|
||
from redis.exceptions import ConnectionError, TimeoutError
|
||
import time, copy, threading
|
||
from typing import List, Dict
|
||
from sqlalchemy import (
|
||
create_engine, MetaData, Table, Column,
|
||
BigInteger, Integer, String, Text, DateTime, tuple_
|
||
)
|
||
from queue import Queue, Empty
|
||
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
||
from sqlalchemy.exc import OperationalError
|
||
from sqlalchemy import select
|
||
from logger import logger
|
||
from datetime import datetime
|
||
|
||
MYSQL_URL = (
|
||
"mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ"
|
||
"@192.144.230.75:3306/db_vidcon?charset=utf8mb4"
|
||
)
|
||
_engine = create_engine(
|
||
MYSQL_URL,
|
||
pool_size=20, max_overflow=10,
|
||
pool_pre_ping=True, pool_recycle=3600,
|
||
future=True,
|
||
)
|
||
_meta = MetaData()
|
||
# 操作记录表
|
||
video_op = Table("sh_dm_video_op_v3", _meta,
|
||
Column("id", Integer, primary_key=True, autoincrement=True),
|
||
Column("v_id", String(64)),
|
||
Column("v_xid", String(64)),
|
||
Column("a_id", Integer, default=0),
|
||
Column("level", Integer),
|
||
Column("name_title", String(100)),
|
||
Column("keyword", String(100)),
|
||
Column("rn", String(50)),
|
||
Column("history_status", String(100)),
|
||
Column("is_repeat", Integer),
|
||
Column("is_piracy", String(2), default='3'),
|
||
Column("sort", Integer),
|
||
Column("createtime", Integer),
|
||
Column("updatetime", Integer),
|
||
Column("operatetime", Integer),
|
||
Column("batch", Integer),
|
||
Column("machine", Integer),
|
||
Column("ts_status", Integer, default=1),
|
||
)
|
||
|
||
# 视频表
|
||
video = Table("sh_dm_video_v3", _meta,
|
||
Column("id", Integer, primary_key=True, autoincrement=True),
|
||
Column("v_id", String(64)),
|
||
Column("v_name", String(100), nullable=False),
|
||
Column("v_xid", String(64), nullable=False),
|
||
Column("title", String(255), nullable=False),
|
||
Column("link", String(255), nullable=False),
|
||
Column("edition", String(255), default=''),
|
||
Column("duration", String(11), default='0'),
|
||
Column("watch_number", Integer, default=0),
|
||
Column("follow_number", Integer, default=0),
|
||
Column("video_number", Integer, default=0),
|
||
Column("public_time", DateTime),
|
||
Column("cover_pic", String(255)),
|
||
Column("sort", Integer),
|
||
Column("u_xid", String(64)),
|
||
Column("u_id", String(100)),
|
||
Column("u_pic", String(255)),
|
||
Column("u_name", String(255)),
|
||
Column("status", Integer, default=1),
|
||
Column("createtime", Integer, default=0),
|
||
Column("updatetime", Integer, default=0),
|
||
Column("operatetime", Integer, default=0),
|
||
)
|
||
# 作者表
|
||
video_author = Table(
|
||
"sh_dm_video_author",
|
||
_meta,
|
||
Column("id", Integer, primary_key=True, autoincrement=True),
|
||
Column("u_id", String(64), nullable=True, comment="用户内部ID"),
|
||
Column("u_xid", String(64), nullable=False, unique=True, comment="用户外部ID(如第三方ID)"),
|
||
Column("u_name", String(64), nullable=False, comment="用户名"),
|
||
Column("u_pic", String(255), nullable=True, comment="用户头像URL"),
|
||
Column("follow_number", Integer, nullable=True, default=0, comment="粉丝量"),
|
||
Column("v_number", Integer, nullable=True, default=0, comment="用户发布的视频总数"),
|
||
Column("pv_number", Integer, nullable=True, default=0, comment="盗版视频数"),
|
||
Column("b_number", Integer, nullable=True, default=0, comment="打击视频数"),
|
||
Column("create_time", DateTime, nullable=False, comment="入库时间"),
|
||
Column("update_time", Integer, nullable=True, comment="更新时间(UNIX 时间戳)"),
|
||
)
|
||
|
||
|
||
def mysql_retry(max_retries: int = 3, base_delay: float = 2.0):
|
||
"""
|
||
装饰器:捕获断线异常(InterfaceError 或 OperationalError: 2013)后尝试重连,指数回退重试。
|
||
"""
|
||
|
||
def decorator(fn):
|
||
@functools.wraps(fn)
|
||
def wrapper(self, *args, **kwargs):
|
||
for attempt in range(1, max_retries + 1):
|
||
try:
|
||
self.conn.ping(reconnect=True)
|
||
return fn(self, *args, **kwargs)
|
||
except (pymysql.InterfaceError, pymysql.OperationalError) as e:
|
||
if isinstance(e, pymysql.OperationalError) and e.args[0] != 2013:
|
||
raise # 只处理 2013,其他 OperationalError 抛出
|
||
wait = base_delay * (2 ** (attempt - 1))
|
||
logger.warning(f"[MySQL][{fn.__name__}] 第{attempt}次重试(断开连接:{e}),等待 {wait:.1f}s 后重连…")
|
||
time.sleep(wait)
|
||
self._reconnect_mysql()
|
||
if attempt == max_retries:
|
||
logger.error("[MySQL] 重试多次仍失败,抛出异常")
|
||
raise
|
||
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
def redis_retry(max_retries: int = 3):
|
||
"""
|
||
装饰器工厂:指定最大重试次数。
|
||
"""
|
||
|
||
def decorator(fn):
|
||
@functools.wraps(fn)
|
||
def wrapper(self, *args, **kwargs):
|
||
for attempt in range(1, max_retries + 1):
|
||
try:
|
||
return fn(self, *args, **kwargs)
|
||
except (ConnectionError, TimeoutError) as e:
|
||
logger.info(f"[Redis][{fn.__name__}] 第 {attempt} 次失败:{e}")
|
||
self.reconnect_redis()
|
||
if attempt == max_retries:
|
||
logger.info("[Redis] 连接彻底失败")
|
||
raise
|
||
logger.info(f"[Redis] 重连后第 {attempt + 1} 次重试…")
|
||
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
class DBVidcon:
|
||
_MYSQL_CONF = {
|
||
"host": "192.144.230.75", # "127.0.0.1", #
|
||
"port": 3306, # 3307, #
|
||
"user": "db_vidcon",
|
||
"password": "rexdK4fhCCiRE4BZ",
|
||
"database": "db_vidcon",
|
||
"charset": "utf8mb4",
|
||
"cursorclass": pymysql.cursors.DictCursor,
|
||
}
|
||
_REDIS_CONF = {
|
||
"host": "192.144.230.75", # "127.0.0.1", #
|
||
"port": 6379, # 6380, #
|
||
"password": "qwert@$123!&",
|
||
"decode_responses": True,
|
||
}
|
||
|
||
def __init__(self):
|
||
self.l0_list_key = "video_l0_queue"
|
||
self.l1_list_key = "video_l1_queue"
|
||
self.l2_list_key = "video_l2_queue"
|
||
self.report_list = "report_queue"
|
||
self.error_list_key = "error_save_queue"
|
||
self.conn = pymysql.connect(**self._MYSQL_CONF)
|
||
self.cursor = self.conn.cursor()
|
||
self.redis = redis.Redis(**self._REDIS_CONF)
|
||
|
||
def _connect_redis(self):
|
||
"""初始化或重建 Redis 客户端"""
|
||
self.redis = redis.Redis(**self._REDIS_CONF)
|
||
|
||
def reconnect_redis(self):
|
||
"""当捕获到 ConnectionError 时,重连 Redis"""
|
||
try:
|
||
self._connect_redis()
|
||
except Exception as e:
|
||
logger.info("[Redis reconnect error]", e)
|
||
time.sleep(2)
|
||
|
||
@redis_retry(max_retries=3)
|
||
def push_record(self, data: dict):
|
||
raw = json.dumps(data, ensure_ascii=False)
|
||
self.redis.lpush(self.error_list_key, raw)
|
||
|
||
@redis_retry(max_retries=3)
|
||
def fetch_from_redis(self, count: int = 100, list_key: str = None):
|
||
key = list_key
|
||
try:
|
||
raws = self.redis.lpop(key, count)
|
||
except TypeError:
|
||
raws = []
|
||
for _ in range(count):
|
||
item = self.redis.rpop(key)
|
||
if item is None:
|
||
break
|
||
raws.append(item)
|
||
except redis.exceptions.ConnectionError as e:
|
||
logger.info("[Redis pop error]", e)
|
||
self.reconnect_redis()
|
||
return []
|
||
if not raws:
|
||
return []
|
||
if isinstance(raws, str):
|
||
raws = [raws]
|
||
out = []
|
||
for raw in raws:
|
||
try:
|
||
out.append((raw, json.loads(raw)))
|
||
except json.JSONDecodeError:
|
||
continue
|
||
return out
|
||
|
||
@mysql_retry()
|
||
def get_account_info(self, mid: str):
|
||
sql = "SELECT `account`,`password` FROM sh_site_accounts WHERE id = %s AND status=1 LIMIT 1"
|
||
self.cursor.execute(sql, (mid,))
|
||
result = self.cursor.fetchone()
|
||
return result
|
||
|
||
@redis_retry(max_retries=3)
|
||
def push_l0(self, raws):
|
||
"""向 l0(加急)队列写入数据"""
|
||
if isinstance(raws, str):
|
||
raws = [raws]
|
||
self.redis.lpush(self.l0_list_key, *raws)
|
||
logger.info(f"[写入l0] 已推入 {len(raws)} 条")
|
||
|
||
@redis_retry(max_retries=3)
|
||
def push_l1(self, payloads):
|
||
"""向 l1(普通)队列写入数据"""
|
||
if isinstance(payloads, str):
|
||
payloads = [payloads]
|
||
self.redis.rpush(self.l1_list_key, *payloads)
|
||
logger.info(f"[写入l1] 已推入 {len(payloads)} 条")
|
||
|
||
@redis_retry(max_retries=3)
|
||
def push_l2(self, raws):
|
||
"""向 l2(低优先)队列写入数据"""
|
||
if isinstance(raws, str):
|
||
raws = [raws]
|
||
self.redis.lpush(self.l2_list_key, *raws)
|
||
logger.info(f"[写入l2] 已推入 {len(raws)} 条")
|
||
|
||
@redis_retry(max_retries=3)
|
||
def push_report(self, raws):
|
||
"""原子操作:清空列表并写入新数据"""
|
||
if isinstance(raws, str):
|
||
raws = [raws]
|
||
|
||
with self.redis.pipeline() as pipe:
|
||
# 开始事务
|
||
pipe.multi()
|
||
|
||
# 删除列表
|
||
pipe.delete(self.report_list)
|
||
|
||
# 如果有新数据,则推入
|
||
if raws:
|
||
pipe.rpush(self.report_list, *raws)
|
||
|
||
# 执行事务
|
||
pipe.execute()
|
||
|
||
if raws:
|
||
logger.info(f"[写入report] 原子操作:已清空并推入 {len(raws)} 条新数据")
|
||
else:
|
||
logger.info(f"[写入report] 原子操作:已清空列表")
|
||
|
||
@redis_retry(max_retries=3)
|
||
def get_proxy_agent_dict(self) -> dict:
|
||
try:
|
||
meta_json = self.redis.hget("proxy_config", "meta")
|
||
except Exception as e:
|
||
logger.info("[Redis get_proxy_parameter error]", exc_info=e)
|
||
self.reconnect_redis()
|
||
return {}
|
||
|
||
if not meta_json:
|
||
return {}
|
||
|
||
try:
|
||
return json.loads(meta_json)
|
||
except (json.JSONDecodeError, TypeError) as e:
|
||
logger.warning("[Proxy meta JSON decode error]", exc_info=e)
|
||
return {}
|
||
|
||
@mysql_retry()
|
||
def get_proxy_parameter(self, rn: str) -> str:
|
||
sql = "SELECT parameter FROM proxy_agent WHERE rn = %s LIMIT 1"
|
||
self.cursor.execute(sql, (rn,))
|
||
result = self.cursor.fetchone()
|
||
logger.info(result)
|
||
return result['parameter'] if result else None
|
||
|
||
@redis_retry(max_retries=3)
|
||
def item_report(self, count: int = 100):
|
||
try:
|
||
items = self.fetch_from_redis(count, list_key=self.report_list)
|
||
except Exception as e:
|
||
logger.info("[Redis l0 pop error]", e)
|
||
self.reconnect_redis()
|
||
items = []
|
||
return items
|
||
|
||
@redis_retry(max_retries=3)
|
||
def item_keyword(self, count: int = 20):
|
||
try:
|
||
items = self.fetch_from_redis(count, list_key=self.l0_list_key)
|
||
except Exception as e:
|
||
logger.info("[Redis l0 pop error]", e)
|
||
self.reconnect_redis()
|
||
items = []
|
||
|
||
if items:
|
||
return items, 0
|
||
try:
|
||
items = self.fetch_from_redis(count, list_key=self.l1_list_key)
|
||
except Exception as e:
|
||
logger.info("[Redis l1 pop error]", e)
|
||
self.reconnect_redis()
|
||
items = []
|
||
|
||
if items:
|
||
return items, 1
|
||
try:
|
||
items = self.fetch_from_redis(count, list_key=self.l2_list_key)
|
||
except Exception as e:
|
||
logger.info("[Redis l2 pop error]", e)
|
||
self.reconnect_redis()
|
||
items = []
|
||
if items:
|
||
return items, 2
|
||
logger.info("[Redis queues empty] 所有队列均为空")
|
||
return items, 99
|
||
|
||
@redis_retry(max_retries=3)
|
||
def rollback_l1(self, payloads):
|
||
if isinstance(payloads, str):
|
||
payloads = [payloads]
|
||
self.redis.rpush(self.l1_list_key, *payloads)
|
||
logger.info(f"[回滚l1] 已退回 {len(payloads)} 条")
|
||
|
||
@redis_retry(max_retries=3)
|
||
def rollback_l0(self, raws):
|
||
if isinstance(raws, str):
|
||
raws = [raws]
|
||
self.redis.lpush(self.l0_list_key, *raws)
|
||
logger.info(f"[回滚l0] 已退回 {len(raws)} 条")
|
||
|
||
@redis_retry(max_retries=3)
|
||
def rollback_l2(self, raws):
|
||
if isinstance(raws, str):
|
||
raws = [raws]
|
||
self.redis.lpush(self.l2_list_key, *raws)
|
||
logger.info(f"[回滚l2] 已退回 {len(raws)} 条")
|
||
|
||
@mysql_retry()
|
||
def get_report_video(self):
|
||
sql = """
|
||
SELECT
|
||
id,
|
||
name_title,
|
||
link
|
||
FROM
|
||
sh_dm_fight_records
|
||
WHERE
|
||
status = 1
|
||
"""
|
||
self.cursor.execute(sql)
|
||
return self.cursor.fetchall()
|
||
|
||
@mysql_retry()
|
||
def get_subsequent_report_video(self,did: int):
|
||
sql = """
|
||
SELECT DISTINCT report_id
|
||
FROM sh_dm_fight_records
|
||
WHERE
|
||
status = 2
|
||
AND subsequent_status = 1
|
||
AND report_time != ''
|
||
AND mid = %s
|
||
"""
|
||
self.cursor.execute(sql,did)
|
||
return self.cursor.fetchall()
|
||
|
||
@mysql_retry()
|
||
def getreport_video(self):
|
||
sql = """
|
||
SELECT
|
||
id,
|
||
v_xid
|
||
FROM
|
||
sh_dm_fight_records
|
||
WHERE
|
||
is_removed = '' or is_removed IS NULL or is_removed = 0
|
||
"""
|
||
self.cursor.execute(sql)
|
||
return self.cursor.fetchall()
|
||
|
||
@mysql_retry()
|
||
def mark_video_removed(self, d_id: int, removed_flag: int = 1):
|
||
sql = """
|
||
UPDATE sh_dm_fight_records
|
||
SET is_removed = %s
|
||
WHERE id = %s
|
||
"""
|
||
self.cursor.execute(sql, (removed_flag, d_id))
|
||
self.flush()
|
||
|
||
@mysql_retry()
|
||
def update_fight_record_status(self, ids, report_id: int, new_status: int, errinfo: str = "",
|
||
report_time: int = 0, subsequent_status: int = 1, mid=0):
|
||
if not ids:
|
||
return # 空列表直接返回
|
||
|
||
placeholders = ','.join(['%s'] * len(ids))
|
||
sql = f"""
|
||
UPDATE
|
||
sh_dm_fight_records
|
||
SET
|
||
status = %s,
|
||
errinfo = %s,
|
||
updata_time = %s,
|
||
report_id = %s,
|
||
subsequent_status = %s,
|
||
report_time = %s,
|
||
mid = %s
|
||
WHERE
|
||
id IN ({placeholders})
|
||
"""
|
||
now_ts = int(time.time())
|
||
params = [new_status, errinfo, now_ts, report_id, subsequent_status, report_time, mid] + ids
|
||
self.cursor.execute(sql, params)
|
||
|
||
@mysql_retry()
|
||
def update_subsequent_status_by_report_id(self, report_id: int, new_status: int, info: str = ""):
|
||
sql = """
|
||
UPDATE
|
||
sh_dm_fight_records
|
||
SET subsequent_status = %s,
|
||
updata_time = UNIX_TIMESTAMP(),
|
||
subsequent_info = %s
|
||
WHERE report_id = %s \
|
||
"""
|
||
self.cursor.execute(sql, (new_status, info, report_id))
|
||
self.flush()
|
||
|
||
@mysql_retry()
|
||
def update_video_ts_status(self):
|
||
sql = """
|
||
UPDATE sh_dm_video_v2 v
|
||
JOIN sh_dm_video_author a ON v.u_xid = a.u_xid
|
||
SET v.ts_status = 3
|
||
WHERE a.white_status = 1;
|
||
"""
|
||
self.cursor.execute(sql)
|
||
self.flush()
|
||
logger.info("[更新视频举报状态] 已执行完毕")
|
||
|
||
@mysql_retry()
|
||
def upsert_video(self, data: dict):
|
||
logger.info(fr"DB处理->{data.get('v_xid')},\tlevel->{data.get('level')}")
|
||
data.setdefault("a_id", 0)
|
||
data.setdefault("history_status", "")
|
||
data.setdefault("is_piracy", 3)
|
||
data.setdefault("is_repeat", 3)
|
||
data["sort"] = data.get("index", 0)
|
||
|
||
max_retries = 1 # 除了第一次外,再重试一次
|
||
attempt = 0
|
||
|
||
while True:
|
||
try:
|
||
sql_op = """
|
||
INSERT INTO sh_dm_video_op_v2 (
|
||
v_id, v_xid, a_id, level, name_title,
|
||
keyword, rn, history_status, is_repeat,
|
||
sort, createtime, updatetime, batch, machine
|
||
) VALUES (
|
||
%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s,
|
||
%(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s,
|
||
%(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s
|
||
)
|
||
"""
|
||
self.cursor.execute(sql_op, data)
|
||
sql_update = """
|
||
INSERT INTO sh_dm_video_v2 (
|
||
v_id, v_xid, rn, v_name, title, link,
|
||
edition, duration,
|
||
public_time, cover_pic, sort,
|
||
u_xid, u_id, u_pic, u_name,
|
||
status, createtime, updatetime
|
||
) VALUES (
|
||
%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s,
|
||
'', %(duration)s,
|
||
%(create_time)s, %(cover_pic)s, %(sort)s,
|
||
%(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s,
|
||
1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP()
|
||
)
|
||
ON DUPLICATE KEY UPDATE
|
||
title = VALUES(title),
|
||
duration = VALUES(duration),
|
||
cover_pic = VALUES(cover_pic),
|
||
sort = VALUES(sort),
|
||
updatetime = UNIX_TIMESTAMP();
|
||
"""
|
||
|
||
self.cursor.execute(sql_update, data)
|
||
break # 成功跳出重试循环
|
||
|
||
except Exception as e:
|
||
|
||
# 回滚这次未提交的改动
|
||
self.conn.rollback()
|
||
logger.info("[数据库写入异常]", str(e))
|
||
logger.info("[出错数据]:", data)
|
||
|
||
if attempt < max_retries:
|
||
attempt += 1
|
||
logger.info(f"第 {attempt + 1} 次重试…")
|
||
continue
|
||
else:
|
||
# 重试过后依然失败,推入 Redis 备用
|
||
logger.info("重试失败,将数据写入 Redis 以便后续处理")
|
||
self.push_record(data)
|
||
logger.info("[交由Redis处理]")
|
||
break
|
||
|
||
@mysql_retry()
|
||
def fetch_keyword_title(self, level: int = 99):
|
||
sql = """
|
||
SELECT
|
||
k.keyword,
|
||
k.rn,
|
||
t.title AS v_name,
|
||
ANY_VALUE(t.level) AS level
|
||
FROM
|
||
sh_dm_keyword k
|
||
LEFT JOIN
|
||
sh_dm_title t ON k.title = t.title
|
||
WHERE
|
||
k.status = 1
|
||
AND t.status = 1
|
||
AND NOT EXISTS (
|
||
SELECT 1
|
||
FROM sh_dm_black_keyword b
|
||
WHERE
|
||
(
|
||
(b.keyword IS NULL OR b.keyword = '') AND b.title = t.title
|
||
)
|
||
OR (
|
||
b.keyword IS NOT NULL AND b.keyword != '' AND b.keyword = k.keyword
|
||
)
|
||
)
|
||
AND t.level = %s
|
||
GROUP BY
|
||
k.keyword, k.rn;
|
||
"""
|
||
self.cursor.execute(sql, (level,))
|
||
return self.cursor.fetchall()
|
||
|
||
@mysql_retry()
|
||
def log_batch_start(self, info: Dict) -> int or None:
|
||
batch = info.get("batch")
|
||
level = info.get("level")
|
||
if batch is None or level is None:
|
||
raise ValueError("info 字典必须包含 'batch' 和 'level'")
|
||
count = info.get("count", 0)
|
||
if level == 0:
|
||
t0, t1, t2 = count, 0, 0
|
||
elif level == 1:
|
||
t0, t1, t2 = 0, count, 0
|
||
elif level == 9:
|
||
level = 2
|
||
t0, t1, t2 = 0, 0, count
|
||
|
||
start_ts = int(time.time())
|
||
sql = """
|
||
INSERT INTO sh_dm_batch_log
|
||
(batch, info, t0, t1, t2, starttime)
|
||
VALUES (%s, %s, %s, %s, %s, %s)
|
||
"""
|
||
try:
|
||
self.cursor.execute(sql, (batch, level, t0, t1, t2, start_ts))
|
||
self.conn.commit()
|
||
return self.cursor.lastrowid
|
||
except Exception as e:
|
||
logger.info(f"[log_batch_start] 插入失败:{e}")
|
||
return None
|
||
|
||
@mysql_retry()
|
||
def flush(self):
|
||
"""批量执行完后手动提交。"""
|
||
self.conn.commit()
|
||
|
||
def close(self):
|
||
try:
|
||
if self.cursor:
|
||
self.cursor.close()
|
||
finally:
|
||
if self.conn:
|
||
self.conn.close()
|
||
try:
|
||
if hasattr(self, "redis") and self.redis:
|
||
# redis-py ≥ 4.2 推荐直接调用 .close()
|
||
if hasattr(self.redis, "close"):
|
||
self.redis.close()
|
||
else:
|
||
self.redis.connection_pool.disconnect()
|
||
except Exception as e:
|
||
logger.info("[Redis close error]", e)
|
||
|
||
@redis_retry(max_retries=3)
|
||
def get_proxy(self, region_code: str) -> str:
|
||
region_code = region_code.upper()
|
||
list_json_str = self.redis.hget("proxy_config", "list") or "[]"
|
||
conf_str = self.redis.hget("proxy_config", "conf") or "0"
|
||
|
||
try:
|
||
proxies = json.loads(list_json_str)
|
||
idx = int(conf_str)
|
||
entry = proxies[idx]
|
||
result = entry.get("data", {}).get(region_code, "")
|
||
if not result.startswith("http://"):
|
||
result = "http://" + result
|
||
return result
|
||
except (ValueError, IndexError, KeyError, json.JSONDecodeError):
|
||
return ""
|
||
|
||
@redis_retry(max_retries=3)
|
||
def queues_empty(self) -> bool:
|
||
"""
|
||
如果都空,返回 True;只要有一个不空,就返回 False。
|
||
"""
|
||
return (
|
||
self.redis.llen(self.l0_list_key) == 0
|
||
and self.redis.llen(self.l1_list_key) == 0
|
||
and self.redis.llen(self.l2_list_key) == 0
|
||
)
|
||
|
||
@redis_retry()
|
||
def l0_empty(self) -> bool:
|
||
return self.redis.llen(self.l0_list_key) == 0
|
||
|
||
@redis_retry()
|
||
def l1_empty(self) -> bool:
|
||
return self.redis.llen(self.l1_list_key) == 0
|
||
|
||
@redis_retry()
|
||
def l2_empty(self) -> bool:
|
||
return self.redis.llen(self.l2_list_key) == 0
|
||
|
||
@redis_retry(max_retries=3)
|
||
def pop_error_item(self):
|
||
"""
|
||
从 error_list_key 中弹出一个错误记录(lpop),
|
||
如果队列为空,返回 None。
|
||
"""
|
||
item = self.redis.lpop(self.error_list_key)
|
||
# 如果你存入的是 JSON 字符串,可以在这里做一次反序列化:
|
||
return json.loads(item) if item is not None else None
|
||
|
||
|
||
class DBSA:
|
||
# ======= 可调参数 =======
|
||
FLUSH_EVERY_ROWS = 100 # 行数阈值
|
||
FLUSH_INTERVAL = 30 # 秒阈值
|
||
MAX_SQL_RETRY = 3 # SQL 死锁自旋
|
||
SQL_RETRY_BASE_SLEEP = 0.5 # 自旋退避基数
|
||
FLUSH_RETRY = 3 # flush 整体轮次
|
||
DELAY_ON_FAIL = 10 # flush 失败等待
|
||
DEADLOCK_ERRNO = 1213 # MySQL 死锁码
|
||
LOCK_TIMEOUT = 3 # 互斥锁超时
|
||
# ========================
|
||
|
||
# ----- 缓冲区 -----
|
||
_buf_op = []
|
||
_buf_vid = []
|
||
_buf_payload = []
|
||
_last_flush = time.time()
|
||
|
||
# ----- 并发控制 -----
|
||
_lock = threading.Lock()
|
||
_existing_op_keys = set()
|
||
_existing_vid_keys = set()
|
||
|
||
# ----- queue / 后台线程模式 -----
|
||
_queue_mode = False
|
||
_queue = Queue()
|
||
|
||
# ================== 退回 Redis 模拟 ==================
|
||
@staticmethod
|
||
def push_record_many(rows):
|
||
logger.warning("[退回Redis] cnt=%d", len(rows))
|
||
|
||
# ----------------------------------------------------
|
||
# 对外主入口
|
||
# ----------------------------------------------------
|
||
@classmethod
|
||
def upsert_video(cls, data: dict):
|
||
if cls._queue_mode:
|
||
cls._queue.put(data)
|
||
return
|
||
|
||
# ---------- 数据深拷贝 / 默认值 ----------
|
||
data = copy.deepcopy(data)
|
||
data.setdefault("a_id", 0)
|
||
data.setdefault("is_repeat", 3)
|
||
data.setdefault("keyword", "")
|
||
data["sort"] = data.get("index", 0)
|
||
now_ts = int(time.time())
|
||
|
||
op_index_key = (data["v_xid"] or "", data["keyword"] or "", now_ts)
|
||
vid_index_key = (data["v_xid"] or "", data["v_name"] or "")
|
||
|
||
# ---------- ① 获取互斥锁 ----------
|
||
if not cls._lock.acquire(timeout=cls.LOCK_TIMEOUT):
|
||
logger.error("⚠️ [upsert_video] 获取 cls._lock 超时 %ds", cls.LOCK_TIMEOUT)
|
||
return
|
||
try:
|
||
# ---------- ② 去重 ----------
|
||
if op_index_key in cls._existing_op_keys or vid_index_key in cls._existing_vid_keys:
|
||
return
|
||
|
||
# ---------- ③ 构造 op_row ----------
|
||
op_row = dict(
|
||
v_id=data["v_id"],
|
||
v_xid=data["v_xid"],
|
||
a_id=data["a_id"],
|
||
level=data.get("level", 0),
|
||
name_title=data["v_name"],
|
||
keyword=data["keyword"],
|
||
is_repeat=data["is_repeat"],
|
||
sort=data["sort"],
|
||
createtime=now_ts, # 首次插入
|
||
updatetime=now_ts, # 后续更新只改这一列
|
||
batch=data.get("batch", 0),
|
||
machine=data.get("machine_id", 0),
|
||
is_piracy=data.get("is_piracy", '3'),
|
||
ts_status=data.get("ts_status", 1),
|
||
rn=data.get("rn", ""),
|
||
)
|
||
|
||
# ---------- ④ 构造 vid_row ----------
|
||
vid_row = dict(
|
||
v_id=data["v_id"],
|
||
v_xid=data["v_xid"],
|
||
title=data["title"],
|
||
v_name=data["v_name"],
|
||
link=data["link"],
|
||
edition="",
|
||
duration=str(data["duration"]) if data.get("duration") else '0',
|
||
public_time=data["create_time"],
|
||
cover_pic=data["cover_pic"],
|
||
sort=data["sort"],
|
||
u_xid=data["u_xid"],
|
||
u_id=data["u_id"],
|
||
u_pic=data["u_pic"],
|
||
u_name=data["u_name"],
|
||
status=1,
|
||
createtime=now_ts,
|
||
updatetime=now_ts,
|
||
watch_number=data.get("view", 0),
|
||
follow_number=data.get("fans", 0),
|
||
video_number=data.get("videos", 0),
|
||
)
|
||
# 只保留 video 表合法字段
|
||
vid_row = {k: v for k, v in vid_row.items() if k in video.c}
|
||
|
||
# ---------- ⑤ 入缓冲 ----------
|
||
cls._buf_op.append(op_row)
|
||
cls._buf_vid.append(vid_row)
|
||
cls._buf_payload.append(data)
|
||
cls._existing_op_keys.add(op_index_key)
|
||
cls._existing_vid_keys.add(vid_index_key)
|
||
finally:
|
||
cls._lock.release()
|
||
|
||
# ---------- ⑥ 判断是否触发 flush ----------
|
||
if (len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS or
|
||
time.time() - cls._last_flush >= cls.FLUSH_INTERVAL):
|
||
logger.info("落表:达到行数或超时阈值,开始落库")
|
||
cls.flush()
|
||
|
||
# ----------------------------------------------------
|
||
# 单条 SQL 安全执行:死锁自旋 + 连接池日志
|
||
# ----------------------------------------------------
|
||
@classmethod
|
||
def _safe_execute(cls, statement, desc=""):
|
||
for attempt in range(cls.MAX_SQL_RETRY):
|
||
try:
|
||
logger.debug("[%s] 准备借连接", desc)
|
||
with _engine.begin() as conn:
|
||
logger.debug("[%s] 借连接成功", desc)
|
||
conn.execute(statement)
|
||
return
|
||
except Exception as e:
|
||
err_no = getattr(e.orig, "args", [None])[0]
|
||
if err_no == cls.DEADLOCK_ERRNO and attempt < cls.MAX_SQL_RETRY - 1:
|
||
time.sleep(cls.SQL_RETRY_BASE_SLEEP * (attempt + 1))
|
||
logger.warning("[%s] 死锁重试 %d/%d", desc,
|
||
attempt + 1, cls.MAX_SQL_RETRY)
|
||
continue
|
||
logger.exception("[%s] 执行 SQL 失败", desc)
|
||
raise
|
||
|
||
# ----------------------------------------------------
|
||
# flush 外层:整体重试
|
||
# ----------------------------------------------------
|
||
@classmethod
|
||
def flush(cls):
|
||
for round_no in range(1, cls.FLUSH_RETRY + 1):
|
||
try:
|
||
cls._flush_once()
|
||
return
|
||
except Exception as e:
|
||
logger.error("[flush] 第 %d 轮失败:%s", round_no, e)
|
||
if round_no < cls.FLUSH_RETRY:
|
||
time.sleep(cls.DELAY_ON_FAIL)
|
||
logger.info("[flush] 等待 %ds 后重试…", cls.DELAY_ON_FAIL)
|
||
else:
|
||
logger.error("[flush] 连续 %d 轮失败,退回 Redis", cls.FLUSH_RETRY)
|
||
cls.push_record_many(cls._buf_payload)
|
||
cls._clear_buffers()
|
||
return
|
||
|
||
# ----------------------------------------------------
|
||
# 真正写库
|
||
# ----------------------------------------------------
|
||
@classmethod
|
||
def _flush_once(cls):
|
||
t0 = time.time()
|
||
# ---------- 拷贝缓冲并清空 ----------
|
||
if not cls._lock.acquire(timeout=cls.LOCK_TIMEOUT):
|
||
raise RuntimeError("flush 未取得 cls._lock,可能死锁")
|
||
try:
|
||
op_rows = cls._buf_op[:]
|
||
vid_rows = cls._buf_vid[:]
|
||
payloads = cls._buf_payload[:]
|
||
cls._clear_buffers()
|
||
finally:
|
||
cls._lock.release()
|
||
|
||
if not op_rows and not vid_rows:
|
||
return
|
||
|
||
# ---------- 写 video_author ----------
|
||
authors_map = {}
|
||
now_ts = int(time.time())
|
||
for d in payloads:
|
||
uxid = d.get("u_xid")
|
||
if not uxid:
|
||
continue
|
||
authors_map[uxid] = dict(
|
||
u_xid=uxid,
|
||
u_id=d.get("u_id", 0),
|
||
u_name=d.get("u_name"),
|
||
u_pic=d.get("u_pic"),
|
||
follow_number=d.get("fans", 0),
|
||
v_number=d.get("videos", 0),
|
||
pv_number=0,
|
||
b_number=0,
|
||
create_time=datetime.utcnow(),
|
||
update_time=now_ts,
|
||
)
|
||
if authors_map:
|
||
stmt_auth = mysql_insert(video_author).values(list(authors_map.values()))
|
||
ondup_auth = stmt_auth.on_duplicate_key_update(
|
||
u_name=stmt_auth.inserted.u_name,
|
||
u_pic=stmt_auth.inserted.u_pic,
|
||
follow_number=stmt_auth.inserted.follow_number,
|
||
v_number=stmt_auth.inserted.v_number,
|
||
update_time=stmt_auth.inserted.update_time,
|
||
)
|
||
cls._safe_execute(ondup_auth, desc="video_author")
|
||
|
||
# ---------- 写 video_op ----------
|
||
if op_rows:
|
||
stmt_op = mysql_insert(video_op).values(op_rows)
|
||
ondup_op = stmt_op.on_duplicate_key_update(
|
||
updatetime = stmt_op.inserted.updatetime,
|
||
# ts_status = stmt_op.inserted.ts_status,
|
||
is_repeat = stmt_op.inserted.is_repeat,
|
||
)
|
||
cls._safe_execute(ondup_op, desc="video_op")
|
||
logger.info("落表:操作记录 %d 条", len(op_rows))
|
||
|
||
# ---------- 写 video ----------
|
||
if vid_rows:
|
||
stmt_vid = mysql_insert(video).values(vid_rows)
|
||
ondup_vid = stmt_vid.on_duplicate_key_update(
|
||
title = stmt_vid.inserted.title,
|
||
v_name = stmt_vid.inserted.v_name,
|
||
link = stmt_vid.inserted.link,
|
||
edition = stmt_vid.inserted.edition,
|
||
duration = stmt_vid.inserted.duration,
|
||
watch_number = stmt_vid.inserted.watch_number,
|
||
follow_number = stmt_vid.inserted.follow_number,
|
||
video_number = stmt_vid.inserted.video_number,
|
||
public_time = stmt_vid.inserted.public_time,
|
||
cover_pic = stmt_vid.inserted.cover_pic,
|
||
sort = stmt_vid.inserted.sort,
|
||
u_xid = stmt_vid.inserted.u_xid,
|
||
u_id = stmt_vid.inserted.u_id,
|
||
u_pic = stmt_vid.inserted.u_pic,
|
||
u_name = stmt_vid.inserted.u_name,
|
||
status = stmt_vid.inserted.status,
|
||
updatetime = stmt_vid.inserted.updatetime,
|
||
)
|
||
cls._safe_execute(ondup_vid, desc="video")
|
||
logger.info("落表:视频记录 %d 条", len(vid_rows))
|
||
|
||
logger.debug("[flush] 本轮耗时 %.3f s", time.time() - t0)
|
||
|
||
# ----------------------------------------------------
|
||
# 清空缓冲
|
||
# ----------------------------------------------------
|
||
@classmethod
|
||
def _clear_buffers(cls):
|
||
cls._buf_op.clear()
|
||
cls._buf_vid.clear()
|
||
cls._buf_payload.clear()
|
||
cls._existing_op_keys.clear()
|
||
cls._existing_vid_keys.clear()
|
||
cls._last_flush = time.time()
|
||
|
||
# ----------------------------------------------------
|
||
# 4️⃣ 可选:启用后台单线程落库
|
||
# ----------------------------------------------------
|
||
@classmethod
|
||
def start_single_flusher(cls):
|
||
cls._queue_mode = True
|
||
|
||
def _worker():
|
||
batch = []
|
||
while True:
|
||
try:
|
||
data = cls._queue.get(timeout=3)
|
||
batch.append(data)
|
||
while True:
|
||
try:
|
||
batch.append(cls._queue.get_nowait())
|
||
except Empty:
|
||
break
|
||
except Empty:
|
||
pass
|
||
|
||
if not batch:
|
||
continue
|
||
|
||
for d in batch:
|
||
cls._buffer_without_lock(d)
|
||
batch.clear()
|
||
|
||
if len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS:
|
||
cls.flush()
|
||
|
||
threading.Thread(target=_worker, daemon=True).start()
|
||
logger.info("后台 flusher 线程已启动(单线程写库模式)")
|
||
|
||
# ----------------------------------------------------
|
||
# 将 queue 里的数据写入缓冲(不加线程锁)
|
||
# ----------------------------------------------------
|
||
@classmethod
|
||
def _buffer_without_lock(cls, data):
|
||
data = copy.deepcopy(data)
|
||
data.setdefault("is_repeat", 3)
|
||
data.setdefault("keyword", "")
|
||
data["sort"] = data.get("index", 0)
|
||
now_ts = int(time.time())
|
||
|
||
op_key = (data["v_xid"] or "", data["keyword"] or "", now_ts)
|
||
vid_key = (data["v_xid"] or "", data["v_name"] or "")
|
||
if op_key in cls._existing_op_keys or vid_key in cls._existing_vid_keys:
|
||
return
|
||
|
||
op_row = dict(
|
||
v_id=data["v_id"],
|
||
v_xid=data["v_xid"],
|
||
a_id=data.get("a_id", 0),
|
||
level=data.get("level", 0),
|
||
name_title=data["v_name"],
|
||
keyword=data["keyword"],
|
||
is_repeat=data["is_repeat"],
|
||
sort=data["sort"],
|
||
createtime=now_ts,
|
||
updatetime=now_ts,
|
||
batch=data.get("batch", 0),
|
||
machine=data.get("machine_id", 0),
|
||
is_piracy=data.get("is_piracy", '3'),
|
||
ts_status=data.get("ts_status", 1),
|
||
rn=data.get("rn", ""),
|
||
)
|
||
vid_row = dict(
|
||
v_id=data["v_id"],
|
||
v_xid=data["v_xid"],
|
||
title=data["title"],
|
||
v_name=data["v_name"],
|
||
link=data["link"],
|
||
edition="",
|
||
duration=str(data["duration"]) if data.get("duration") else '0',
|
||
public_time=data["create_time"],
|
||
cover_pic=data["cover_pic"],
|
||
sort=data["sort"],
|
||
u_xid=data["u_xid"],
|
||
u_id=data["u_id"],
|
||
u_pic=data["u_pic"],
|
||
u_name=data["u_name"],
|
||
status=1,
|
||
createtime=now_ts,
|
||
updatetime=now_ts,
|
||
watch_number=data.get("view", 0),
|
||
follow_number=data.get("fans", 0),
|
||
video_number=data.get("videos", 0),
|
||
)
|
||
vid_row = {k: v for k, v in vid_row.items() if k in video.c}
|
||
|
||
cls._buf_op.append(op_row)
|
||
cls._buf_vid.append(vid_row)
|
||
cls._buf_payload.append(data)
|
||
cls._existing_op_keys.add(op_key)
|
||
cls._existing_vid_keys.add(vid_key)
|