DailyMotion/DB.py

1045 lines
37 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import redis
import pymysql
import functools
from redis.exceptions import ConnectionError, TimeoutError
import time, copy, threading
from typing import List, Dict
from sqlalchemy import (
create_engine, MetaData, Table, Column,
BigInteger, Integer, String, Text, DateTime, tuple_
)
from queue import Queue, Empty
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import OperationalError
from sqlalchemy import select
from logger import logger
from datetime import datetime
MYSQL_URL = (
"mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ"
"@192.144.230.75:3306/db_vidcon?charset=utf8mb4"
)
_engine = create_engine(
MYSQL_URL,
pool_size=20, max_overflow=10,
pool_pre_ping=True, pool_recycle=3600,
future=True,
)
_meta = MetaData()
# 操作记录表
video_op = Table("sh_dm_video_op_v3", _meta,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("v_id", String(64)),
Column("v_xid", String(64)),
Column("a_id", Integer, default=0),
Column("level", Integer),
Column("name_title", String(100)),
Column("keyword", String(100)),
Column("rn", String(50)),
Column("history_status", String(100)),
Column("is_repeat", Integer),
Column("is_piracy", String(2), default='3'),
Column("sort", Integer),
Column("createtime", Integer),
Column("updatetime", Integer),
Column("operatetime", Integer),
Column("batch", Integer),
Column("machine", Integer),
Column("ts_status", Integer, default=1),
)
# 视频表
video = Table("sh_dm_video_v3", _meta,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("v_id", String(64)),
Column("v_name", String(100), nullable=False),
Column("v_xid", String(64), nullable=False),
Column("title", String(255), nullable=False),
Column("link", String(255), nullable=False),
Column("edition", String(255), default=''),
Column("duration", String(11), default='0'),
Column("watch_number", Integer, default=0),
Column("follow_number", Integer, default=0),
Column("video_number", Integer, default=0),
Column("public_time", DateTime),
Column("cover_pic", String(255)),
Column("sort", Integer),
Column("u_xid", String(64)),
Column("u_id", String(100)),
Column("u_pic", String(255)),
Column("u_name", String(255)),
Column("status", Integer, default=1),
Column("createtime", Integer, default=0),
Column("updatetime", Integer, default=0),
Column("operatetime", Integer, default=0),
)
# 作者表
video_author = Table(
"sh_dm_video_author",
_meta,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("u_id", String(64), nullable=True, comment="用户内部ID"),
Column("u_xid", String(64), nullable=False, unique=True, comment="用户外部ID如第三方ID"),
Column("u_name", String(64), nullable=False, comment="用户名"),
Column("u_pic", String(255), nullable=True, comment="用户头像URL"),
Column("follow_number", Integer, nullable=True, default=0, comment="粉丝量"),
Column("v_number", Integer, nullable=True, default=0, comment="用户发布的视频总数"),
Column("pv_number", Integer, nullable=True, default=0, comment="盗版视频数"),
Column("b_number", Integer, nullable=True, default=0, comment="打击视频数"),
Column("create_time", DateTime, nullable=False, comment="入库时间"),
Column("update_time", Integer, nullable=True, comment="更新时间UNIX 时间戳)"),
)
def mysql_retry(max_retries: int = 3, base_delay: float = 2.0):
"""
装饰器工厂:捕获 InterfaceError 后断线重连并重试,
重试间隔按指数级增长base_delay * 2**(attempt-1) 秒。
"""
def decorator(fn):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
for attempt in range(1, max_retries + 1):
try:
# 确保连接存活reconnect=True 会在 ping 失败时重连
self.conn.ping(reconnect=True)
return fn(self, *args, **kwargs)
except pymysql.InterfaceError as e:
wait = base_delay * (2 ** (attempt - 1))
logger.info(f"[MySQL][{fn.__name__}] 第{attempt}次 InterfaceError{e},等待 {wait:.1f}s 后重连…")
time.sleep(wait)
self._reconnect_mysql()
if attempt == max_retries:
logger.info("[MySQL] 重试多次仍失败,抛出异常")
raise
return wrapper
return decorator
def redis_retry(max_retries: int = 3):
"""
装饰器工厂:指定最大重试次数。
"""
def decorator(fn):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
for attempt in range(1, max_retries + 1):
try:
return fn(self, *args, **kwargs)
except (ConnectionError, TimeoutError) as e:
logger.info(f"[Redis][{fn.__name__}] 第 {attempt} 次失败:{e}")
self.reconnect_redis()
if attempt == max_retries:
logger.info("[Redis] 连接彻底失败")
raise
logger.info(f"[Redis] 重连后第 {attempt + 1} 次重试…")
return wrapper
return decorator
class DBVidcon:
_MYSQL_CONF = {
"host": "192.144.230.75", # "127.0.0.1", #
"port": 3306, # 3307, #
"user": "db_vidcon",
"password": "rexdK4fhCCiRE4BZ",
"database": "db_vidcon",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
_REDIS_CONF = {
"host": "192.144.230.75", # "127.0.0.1", #
"port": 6379, # 6380, #
"password": "qwert@$123!&",
"decode_responses": True,
}
def __init__(self):
self.l0_list_key = "video_l0_queue"
self.l1_list_key = "video_l1_queue"
self.l2_list_key = "video_l2_queue"
self.report_list = "report_queue"
self.error_list_key = "error_save_queue"
self.conn = pymysql.connect(**self._MYSQL_CONF)
self.cursor = self.conn.cursor()
self.redis = redis.Redis(**self._REDIS_CONF)
def _connect_redis(self):
"""初始化或重建 Redis 客户端"""
self.redis = redis.Redis(**self._REDIS_CONF)
def reconnect_redis(self):
"""当捕获到 ConnectionError 时,重连 Redis"""
try:
self._connect_redis()
except Exception as e:
logger.info("[Redis reconnect error]", e)
time.sleep(2)
@redis_retry(max_retries=3)
def push_record(self, data: dict):
raw = json.dumps(data, ensure_ascii=False)
self.redis.lpush(self.error_list_key, raw)
@redis_retry(max_retries=3)
def fetch_from_redis(self, count: int = 100, list_key: str = None):
key = list_key
try:
raws = self.redis.lpop(key, count)
except TypeError:
raws = []
for _ in range(count):
item = self.redis.rpop(key)
if item is None:
break
raws.append(item)
except redis.exceptions.ConnectionError as e:
logger.info("[Redis pop error]", e)
self.reconnect_redis()
return []
if not raws:
return []
if isinstance(raws, str):
raws = [raws]
out = []
for raw in raws:
try:
out.append((raw, json.loads(raw)))
except json.JSONDecodeError:
continue
return out
@mysql_retry()
def get_account_info(self, mid: str):
sql = "SELECT `account`,`password` FROM sh_site_accounts WHERE id = %s AND status=1 LIMIT 1"
self.cursor.execute(sql, (mid,))
result = self.cursor.fetchone()
return result
@redis_retry(max_retries=3)
def push_l0(self, raws):
"""向 l0加急队列写入数据"""
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.l0_list_key, *raws)
logger.info(f"[写入l0] 已推入 {len(raws)}")
@redis_retry(max_retries=3)
def push_l1(self, payloads):
"""向 l1普通队列写入数据"""
if isinstance(payloads, str):
payloads = [payloads]
self.redis.rpush(self.l1_list_key, *payloads)
logger.info(f"[写入l1] 已推入 {len(payloads)}")
@redis_retry(max_retries=3)
def push_l2(self, raws):
"""向 l2低优先队列写入数据"""
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.l2_list_key, *raws)
logger.info(f"[写入l2] 已推入 {len(raws)}")
@redis_retry(max_retries=3)
def push_report(self, raws):
"""原子操作:清空列表并写入新数据"""
if isinstance(raws, str):
raws = [raws]
with self.redis.pipeline() as pipe:
# 开始事务
pipe.multi()
# 删除列表
pipe.delete(self.report_list)
# 如果有新数据,则推入
if raws:
pipe.rpush(self.report_list, *raws)
# 执行事务
pipe.execute()
if raws:
logger.info(f"[写入report] 原子操作:已清空并推入 {len(raws)} 条新数据")
else:
logger.info(f"[写入report] 原子操作:已清空列表")
@redis_retry(max_retries=3)
def get_proxy_agent_dict(self) -> dict:
try:
meta_json = self.redis.hget("proxy_config", "meta")
except Exception as e:
logger.info("[Redis get_proxy_parameter error]", exc_info=e)
self.reconnect_redis()
return {}
if not meta_json:
return {}
try:
return json.loads(meta_json)
except (json.JSONDecodeError, TypeError) as e:
logger.warning("[Proxy meta JSON decode error]", exc_info=e)
return {}
@mysql_retry()
def get_proxy_parameter(self, rn: str) -> str:
sql = "SELECT parameter FROM proxy_agent WHERE rn = %s LIMIT 1"
self.cursor.execute(sql, (rn,))
result = self.cursor.fetchone()
logger.info(result)
return result['parameter'] if result else None
@redis_retry(max_retries=3)
def item_report(self, count: int = 100):
try:
items = self.fetch_from_redis(count, list_key=self.report_list)
except Exception as e:
logger.info("[Redis l0 pop error]", e)
self.reconnect_redis()
items = []
return items
@redis_retry(max_retries=3)
def item_keyword(self, count: int = 20):
try:
items = self.fetch_from_redis(count, list_key=self.l0_list_key)
except Exception as e:
logger.info("[Redis l0 pop error]", e)
self.reconnect_redis()
items = []
if items:
return items, 0
try:
items = self.fetch_from_redis(count, list_key=self.l1_list_key)
except Exception as e:
logger.info("[Redis l1 pop error]", e)
self.reconnect_redis()
items = []
if items:
return items, 1
try:
items = self.fetch_from_redis(count, list_key=self.l2_list_key)
except Exception as e:
logger.info("[Redis l2 pop error]", e)
self.reconnect_redis()
items = []
if items:
return items, 2
logger.info("[Redis queues empty] 所有队列均为空")
return items, 99
@redis_retry(max_retries=3)
def rollback_l1(self, payloads):
if isinstance(payloads, str):
payloads = [payloads]
self.redis.rpush(self.l1_list_key, *payloads)
logger.info(f"[回滚l1] 已退回 {len(payloads)}")
@redis_retry(max_retries=3)
def rollback_l0(self, raws):
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.l0_list_key, *raws)
logger.info(f"[回滚l0] 已退回 {len(raws)}")
@redis_retry(max_retries=3)
def rollback_l2(self, raws):
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.l2_list_key, *raws)
logger.info(f"[回滚l2] 已退回 {len(raws)}")
@mysql_retry()
def get_report_video(self):
sql = """
SELECT
id,
name_title,
link
FROM
sh_dm_fight_records
WHERE
status = 1
"""
self.cursor.execute(sql)
return self.cursor.fetchall()
@mysql_retry()
def get_subsequent_report_video(self,did: int):
sql = """
SELECT DISTINCT report_id
FROM sh_dm_fight_records
WHERE
status = 2
AND subsequent_status = 1
AND report_time != ''
AND mid = %s
"""
self.cursor.execute(sql,did)
return self.cursor.fetchall()
@mysql_retry()
def getreport_video(self):
sql = """
SELECT
id,
v_xid
FROM
sh_dm_fight_records
WHERE
is_removed = '' or is_removed IS NULL or is_removed = 0
"""
self.cursor.execute(sql)
return self.cursor.fetchall()
@mysql_retry()
def mark_video_removed(self, d_id: int, removed_flag: int = 1):
sql = """
UPDATE sh_dm_fight_records
SET is_removed = %s
WHERE id = %s
"""
self.cursor.execute(sql, (removed_flag, d_id))
self.flush()
@mysql_retry()
def update_fight_record_status(self, ids, report_id: int, new_status: int, errinfo: str = "",
report_time: int = 0, subsequent_status: int = 1, mid=0):
if not ids:
return # 空列表直接返回
placeholders = ','.join(['%s'] * len(ids))
sql = f"""
UPDATE
sh_dm_fight_records
SET
status = %s,
errinfo = %s,
updata_time = %s,
report_id = %s,
subsequent_status = %s,
report_time = %s,
mid = %s
WHERE
id IN ({placeholders})
"""
now_ts = int(time.time())
params = [new_status, errinfo, now_ts, report_id, subsequent_status, report_time, mid] + ids
self.cursor.execute(sql, params)
@mysql_retry()
def update_subsequent_status_by_report_id(self, report_id: int, new_status: int, info: str = ""):
sql = """
UPDATE
sh_dm_fight_records
SET subsequent_status = %s,
updata_time = UNIX_TIMESTAMP(),
subsequent_info = %s
WHERE report_id = %s \
"""
self.cursor.execute(sql, (new_status, info, report_id))
self.flush()
@mysql_retry()
def update_video_ts_status(self):
sql = """
UPDATE sh_dm_video_v2 v
JOIN sh_dm_video_author a ON v.u_xid = a.u_xid
SET v.ts_status = 3
WHERE a.white_status = 1;
"""
self.cursor.execute(sql)
self.flush()
logger.info("[更新视频举报状态] 已执行完毕")
@mysql_retry()
def upsert_video(self, data: dict):
logger.info(fr"DB处理->{data.get('v_xid')},\tlevel->{data.get('level')}")
data.setdefault("a_id", 0)
data.setdefault("history_status", "")
data.setdefault("is_piracy", 3)
data.setdefault("is_repeat", 3)
data["sort"] = data.get("index", 0)
max_retries = 1 # 除了第一次外,再重试一次
attempt = 0
while True:
try:
sql_op = """
INSERT INTO sh_dm_video_op_v2 (
v_id, v_xid, a_id, level, name_title,
keyword, rn, history_status, is_repeat,
sort, createtime, updatetime, batch, machine
) VALUES (
%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s,
%(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s,
%(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s
)
"""
self.cursor.execute(sql_op, data)
sql_update = """
INSERT INTO sh_dm_video_v2 (
v_id, v_xid, rn, v_name, title, link,
edition, duration,
public_time, cover_pic, sort,
u_xid, u_id, u_pic, u_name,
status, createtime, updatetime
) VALUES (
%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s,
'', %(duration)s,
%(create_time)s, %(cover_pic)s, %(sort)s,
%(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s,
1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP()
)
ON DUPLICATE KEY UPDATE
title = VALUES(title),
duration = VALUES(duration),
cover_pic = VALUES(cover_pic),
sort = VALUES(sort),
updatetime = UNIX_TIMESTAMP();
"""
self.cursor.execute(sql_update, data)
break # 成功跳出重试循环
except Exception as e:
# 回滚这次未提交的改动
self.conn.rollback()
logger.info("[数据库写入异常]", str(e))
logger.info("[出错数据]:", data)
if attempt < max_retries:
attempt += 1
logger.info(f"{attempt + 1} 次重试…")
continue
else:
# 重试过后依然失败,推入 Redis 备用
logger.info("重试失败,将数据写入 Redis 以便后续处理")
self.push_record(data)
logger.info("[交由Redis处理]")
break
@mysql_retry()
def fetch_keyword_title(self, level: int = 99):
sql = """
SELECT
k.keyword,
k.rn,
t.title AS v_name,
ANY_VALUE(t.level) AS level
FROM
sh_dm_keyword k
LEFT JOIN
sh_dm_title t ON k.title = t.title
WHERE
k.status = 1
AND t.status = 1
AND NOT EXISTS (
SELECT 1
FROM sh_dm_black_keyword b
WHERE
(
(b.keyword IS NULL OR b.keyword = '') AND b.title = t.title
)
OR (
b.keyword IS NOT NULL AND b.keyword != '' AND b.keyword = k.keyword
)
)
AND t.level = %s
GROUP BY
k.keyword, k.rn;
"""
self.cursor.execute(sql, (level,))
return self.cursor.fetchall()
@mysql_retry()
def log_batch_start(self, info: Dict) -> int or None:
batch = info.get("batch")
level = info.get("level")
if batch is None or level is None:
raise ValueError("info 字典必须包含 'batch''level'")
count = info.get("count", 0)
if level == 0:
t0, t1, t2 = count, 0, 0
elif level == 1:
t0, t1, t2 = 0, count, 0
elif level == 9:
level = 2
t0, t1, t2 = 0, 0, count
start_ts = int(time.time())
sql = """
INSERT INTO sh_dm_batch_log
(batch, info, t0, t1, t2, starttime)
VALUES (%s, %s, %s, %s, %s, %s)
"""
try:
self.cursor.execute(sql, (batch, level, t0, t1, t2, start_ts))
self.conn.commit()
return self.cursor.lastrowid
except Exception as e:
logger.info(f"[log_batch_start] 插入失败:{e}")
return None
@mysql_retry()
def flush(self):
"""批量执行完后手动提交。"""
self.conn.commit()
def close(self):
try:
if self.cursor:
self.cursor.close()
finally:
if self.conn:
self.conn.close()
try:
if hasattr(self, "redis") and self.redis:
# redis-py ≥ 4.2 推荐直接调用 .close()
if hasattr(self.redis, "close"):
self.redis.close()
else:
self.redis.connection_pool.disconnect()
except Exception as e:
logger.info("[Redis close error]", e)
@redis_retry(max_retries=3)
def get_proxy(self, region_code: str) -> str:
region_code = region_code.upper()
list_json_str = self.redis.hget("proxy_config", "list") or "[]"
conf_str = self.redis.hget("proxy_config", "conf") or "0"
try:
proxies = json.loads(list_json_str)
idx = int(conf_str)
entry = proxies[idx]
result = entry.get("data", {}).get(region_code, "")
if not result.startswith("http://"):
result = "http://" + result
return result
except (ValueError, IndexError, KeyError, json.JSONDecodeError):
return ""
@redis_retry(max_retries=3)
def queues_empty(self) -> bool:
"""
如果都空,返回 True只要有一个不空就返回 False。
"""
return (
self.redis.llen(self.l0_list_key) == 0
and self.redis.llen(self.l1_list_key) == 0
and self.redis.llen(self.l2_list_key) == 0
)
@redis_retry()
def l0_empty(self) -> bool:
return self.redis.llen(self.l0_list_key) == 0
@redis_retry()
def l1_empty(self) -> bool:
return self.redis.llen(self.l1_list_key) == 0
@redis_retry()
def l2_empty(self) -> bool:
return self.redis.llen(self.l2_list_key) == 0
@redis_retry(max_retries=3)
def pop_error_item(self):
"""
从 error_list_key 中弹出一个错误记录lpop
如果队列为空,返回 None。
"""
item = self.redis.lpop(self.error_list_key)
# 如果你存入的是 JSON 字符串,可以在这里做一次反序列化:
return json.loads(item) if item is not None else None
class DBSA:
# ======= 可调参数 =======
FLUSH_EVERY_ROWS = 100 # 行数阈值
FLUSH_INTERVAL = 30 # 秒阈值
MAX_SQL_RETRY = 3 # SQL 死锁自旋
SQL_RETRY_BASE_SLEEP = 0.5 # 自旋退避基数
FLUSH_RETRY = 3 # flush 整体轮次
DELAY_ON_FAIL = 10 # flush 失败等待
DEADLOCK_ERRNO = 1213 # MySQL 死锁码
LOCK_TIMEOUT = 3 # 互斥锁超时
# ========================
# ----- 缓冲区 -----
_buf_op = []
_buf_vid = []
_buf_payload = []
_last_flush = time.time()
# ----- 并发控制 -----
_lock = threading.Lock()
_existing_op_keys = set()
_existing_vid_keys = set()
# ----- queue / 后台线程模式 -----
_queue_mode = False
_queue = Queue()
# ================== 退回 Redis 模拟 ==================
@staticmethod
def push_record_many(rows):
logger.warning("[退回Redis] cnt=%d", len(rows))
# ----------------------------------------------------
# 对外主入口
# ----------------------------------------------------
@classmethod
def upsert_video(cls, data: dict):
"""
业务线程/进程调用此方法写入。
如果启用了 queue 模式,则把 data 丢队列即可。
"""
if cls._queue_mode:
cls._queue.put(data)
return
# ---------- 数据深拷贝 / 默认值 ----------
data = copy.deepcopy(data)
data.setdefault("a_id", 0)
data.setdefault("is_repeat", 3)
data.setdefault("keyword", "")
data["sort"] = data.get("index", 0)
now_ts = int(time.time())
op_index_key = (data["v_xid"] or "", data["keyword"] or "", now_ts)
# —— 用 v_name 去重,避免 title 冲突 ——
vid_index_key = (data["v_xid"] or "", data["v_name"] or "")
# ---------- ① 获取互斥锁 ----------
if not cls._lock.acquire(timeout=cls.LOCK_TIMEOUT):
logger.error("⚠️ [upsert_video] 获取 cls._lock 超时 %ds", cls.LOCK_TIMEOUT)
return
try:
# ---------- ② 去重 ----------
if op_index_key in cls._existing_op_keys or vid_index_key in cls._existing_vid_keys:
return
# ---------- ③ 构造 op_row ----------
op_row = dict(
v_id=data["v_id"],
v_xid=data["v_xid"],
a_id=data["a_id"],
level=data.get("level", 0),
name_title=data["v_name"][:100], # ✨ NEW: 截断避免过长
keyword=data["keyword"],
is_repeat=data["is_repeat"],
sort=data["sort"],
createtime=now_ts,
updatetime=now_ts,
operatetime=now_ts,
batch=data.get("batch", 0),
machine=data.get("machine_id", 0),
is_piracy=data.get("is_piracy", '3'),
ts_status=data.get("ts_status", 1),
rn=data.get("rn", ""),
)
# ---------- ④ 构造 vid_row ----------
vid_row = dict(
v_id=data["v_id"],
v_xid=data["v_xid"],
title=data["title"],
v_name=data["v_name"],
link=data["link"],
edition="",
duration=str(data["duration"]) if data.get("duration") else '0',
public_time=data["create_time"],
cover_pic=data["cover_pic"],
sort=data["sort"],
u_xid=data["u_xid"],
u_id=data["u_id"],
u_pic=data["u_pic"],
u_name=data["u_name"],
status=1,
createtime=now_ts,
updatetime=now_ts,
operatetime=now_ts,
watch_number=data.get("view", 0),
follow_number=data.get("fans", 0),
video_number=data.get("videos", 0),
)
# 只保留 video 表合法字段
vid_row = {k: v for k, v in vid_row.items() if k in video.c}
# ---------- ⑤ 入缓冲 ----------
cls._buf_op.append(op_row)
cls._buf_vid.append(vid_row)
cls._buf_payload.append(data)
cls._existing_op_keys.add(op_index_key)
cls._existing_vid_keys.add(vid_index_key)
finally:
cls._lock.release()
# ---------- ⑥ 判断是否触发 flush ----------
if (len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS or
time.time() - cls._last_flush >= cls.FLUSH_INTERVAL):
logger.info("落表:达到行数或超时阈值,开始落库")
cls.flush()
# ----------------------------------------------------
# 单条 SQL 安全执行:死锁自旋 + 连接池日志
# ----------------------------------------------------
@classmethod
def _safe_execute(cls, statement, desc=""):
for attempt in range(cls.MAX_SQL_RETRY):
try:
logger.debug("[%s] 准备借连接", desc)
with _engine.begin() as conn:
logger.debug("[%s] 借连接成功", desc)
conn.execute(statement)
return
except Exception as e:
err_no = getattr(e.orig, "args", [None])[0]
if err_no == cls.DEADLOCK_ERRNO and attempt < cls.MAX_SQL_RETRY - 1:
time.sleep(cls.SQL_RETRY_BASE_SLEEP * (attempt + 1))
logger.warning("[%s] 死锁重试 %d/%d", desc,
attempt + 1, cls.MAX_SQL_RETRY)
continue
logger.exception("[%s] 执行 SQL 失败", desc)
raise
# ----------------------------------------------------
# flush 外层:整体重试
# ----------------------------------------------------
@classmethod
def flush(cls):
for round_no in range(1, cls.FLUSH_RETRY + 1):
try:
cls._flush_once()
return
except Exception as e:
logger.error("[flush] 第 %d 轮失败:%s", round_no, e)
if round_no < cls.FLUSH_RETRY:
time.sleep(cls.DELAY_ON_FAIL)
logger.info("[flush] 等待 %ds 后重试…", cls.DELAY_ON_FAIL)
else:
logger.error("[flush] 连续 %d 轮失败,退回 Redis", cls.FLUSH_RETRY)
cls.push_record_many(cls._buf_payload)
cls._clear_buffers()
return
# ----------------------------------------------------
# 真正写库
# ----------------------------------------------------
@classmethod
def _flush_once(cls):
t0 = time.time()
# ---------- 拷贝缓冲并清空 ----------
if not cls._lock.acquire(timeout=cls.LOCK_TIMEOUT):
raise RuntimeError("flush 未取得 cls._lock可能死锁")
try:
op_rows = cls._buf_op[:]
vid_rows = cls._buf_vid[:]
payloads = cls._buf_payload[:]
cls._clear_buffers()
finally:
cls._lock.release()
if not op_rows and not vid_rows:
return
# ---------- 写 video_author ----------
authors_map = {}
now_ts = int(time.time())
for d in payloads:
uxid = d.get("u_xid")
if not uxid:
continue
authors_map[uxid] = dict(
u_xid=uxid,
u_id=d.get("u_id", 0),
u_name=d.get("u_name"),
u_pic=d.get("u_pic"),
follow_number=d.get("fans", 0),
v_number=d.get("videos", 0),
pv_number=0,
b_number=0,
create_time=datetime.utcnow(),
update_time=now_ts,
)
if authors_map:
stmt_auth = mysql_insert(video_author).values(list(authors_map.values()))
ondup_auth = stmt_auth.on_duplicate_key_update(
u_name=stmt_auth.inserted.u_name,
u_pic=stmt_auth.inserted.u_pic,
follow_number=stmt_auth.inserted.follow_number,
v_number=stmt_auth.inserted.v_number,
update_time=stmt_auth.inserted.update_time,
)
cls._safe_execute(ondup_auth, desc="video_author")
# ---------- 写 video_op ----------
if op_rows:
stmt_op = mysql_insert(video_op).values(op_rows)
ondup_op = stmt_op.on_duplicate_key_update(
updatetime = stmt_op.inserted.updatetime,
operatetime = stmt_op.inserted.operatetime,
ts_status = stmt_op.inserted.ts_status,
is_repeat = stmt_op.inserted.is_repeat,
)
cls._safe_execute(ondup_op, desc="video_op")
logger.info("落表:操作记录 %d", len(op_rows))
# ---------- 写 video ----------
if vid_rows:
stmt_vid = mysql_insert(video).values(vid_rows)
ondup_vid = stmt_vid.on_duplicate_key_update(
title = stmt_vid.inserted.title,
v_name = stmt_vid.inserted.v_name,
link = stmt_vid.inserted.link,
edition = stmt_vid.inserted.edition,
duration = stmt_vid.inserted.duration,
watch_number = stmt_vid.inserted.watch_number,
follow_number = stmt_vid.inserted.follow_number,
video_number = stmt_vid.inserted.video_number,
public_time = stmt_vid.inserted.public_time,
cover_pic = stmt_vid.inserted.cover_pic,
sort = stmt_vid.inserted.sort,
u_xid = stmt_vid.inserted.u_xid,
u_id = stmt_vid.inserted.u_id,
u_pic = stmt_vid.inserted.u_pic,
u_name = stmt_vid.inserted.u_name,
status = stmt_vid.inserted.status,
updatetime = stmt_vid.inserted.updatetime,
operatetime = stmt_vid.inserted.operatetime,
)
cls._safe_execute(ondup_vid, desc="video")
logger.info("落表:视频记录 %d", len(vid_rows))
logger.debug("[flush] 本轮耗时 %.3f s", time.time() - t0)
# ----------------------------------------------------
# 清空缓冲
# ----------------------------------------------------
@classmethod
def _clear_buffers(cls):
cls._buf_op.clear()
cls._buf_vid.clear()
cls._buf_payload.clear()
cls._existing_op_keys.clear()
cls._existing_vid_keys.clear()
cls._last_flush = time.time()
# ----------------------------------------------------
# 4⃣ 可选:启用后台单线程落库
# ----------------------------------------------------
@classmethod
def start_single_flusher(cls):
"""
启动后台线程 —— 生产线程只喂 queueflusher 串行写库,彻底避免锁竞争。
"""
cls._queue_mode = True
def _worker():
batch = []
while True:
try:
data = cls._queue.get(timeout=3)
batch.append(data)
# drain 队列
while True:
try:
batch.append(cls._queue.get_nowait())
except Empty:
break
except Empty:
pass # 队列暂时为空
if not batch:
continue
# ---- 把 batch 数据重新写入缓冲(无锁)----
for d in batch:
cls._buffer_without_lock(d)
batch.clear()
if len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS:
cls.flush()
threading.Thread(target=_worker, daemon=True).start()
logger.info("后台 flusher 线程已启动(单线程写库模式)")
# ----------------------------------------------------
# 将 queue 里的数据写入缓冲(不加线程锁)
# ----------------------------------------------------
@classmethod
def _buffer_without_lock(cls, data):
data = copy.deepcopy(data)
data.setdefault("is_repeat", 3)
data.setdefault("keyword", "")
data["sort"] = data.get("index", 0)
now_ts = int(time.time())
op_key = (data["v_xid"] or "", data["keyword"] or "", now_ts)
vid_key = (data["v_xid"] or "", data["v_name"] or "")
if op_key in cls._existing_op_keys or vid_key in cls._existing_vid_keys:
return
# —— op_row 同构 ——
op_row = dict(
v_id=data["v_id"],
v_xid=data["v_xid"],
a_id=data.get("a_id", 0),
level=data.get("level", 0),
name_title=data["v_name"],
keyword=data["keyword"],
is_repeat=data["is_repeat"],
sort=data["sort"],
createtime=now_ts,
updatetime=now_ts,
operatetime=now_ts,
batch=data.get("batch", 0),
machine=data.get("machine_id", 0),
is_piracy=data.get("is_piracy", '3'),
ts_status=data.get("ts_status", 1),
rn=data.get("rn", ""),
)
vid_row = dict(
v_id=data["v_id"],
v_xid=data["v_xid"],
title=data["title"],
v_name=data["v_name"],
link=data["link"],
edition="",
duration=str(data["duration"]) if data.get("duration") else '0',
public_time=data["create_time"],
cover_pic=data["cover_pic"],
sort=data["sort"],
u_xid=data["u_xid"],
u_id=data["u_id"],
u_pic=data["u_pic"],
u_name=data["u_name"],
status=1,
createtime=now_ts,
updatetime=now_ts,
operatetime=now_ts,
watch_number=data.get("view", 0),
follow_number=data.get("fans", 0),
video_number=data.get("videos", 0),
)
vid_row = {k: v for k, v in vid_row.items() if k in video.c}
cls._buf_op.append(op_row)
cls._buf_vid.append(vid_row)
cls._buf_payload.append(data)
cls._existing_op_keys.add(op_key)
cls._existing_vid_keys.add(vid_key)