642 lines
22 KiB
Python
642 lines
22 KiB
Python
import json
|
||
import redis
|
||
import pymysql
|
||
import time
|
||
import functools
|
||
from redis.exceptions import ConnectionError, TimeoutError
|
||
import time, copy, threading
|
||
from typing import List, Dict
|
||
from sqlalchemy import (
|
||
create_engine, MetaData, Table, Column,
|
||
BigInteger, Integer, String, Text
|
||
)
|
||
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
||
from logger import logger
|
||
|
||
MYSQL_URL = (
|
||
"mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ"
|
||
"@192.144.230.75:3306/db_vidcon?charset=utf8mb4"
|
||
)
|
||
|
||
_engine = create_engine(
|
||
MYSQL_URL,
|
||
pool_size=20, max_overflow=10,
|
||
pool_pre_ping=True, pool_recycle=3600,
|
||
future=True,
|
||
)
|
||
_meta = MetaData()
|
||
|
||
video_op = Table("sh_dm_video_op_v2", _meta,
|
||
Column("v_id", BigInteger, primary_key=True),
|
||
Column("v_xid", String(64)),
|
||
Column("a_id", Integer),
|
||
Column("level", Integer),
|
||
Column("name_title", String(255)),
|
||
Column("keyword", String(255)),
|
||
Column("rn", String(8)),
|
||
Column("history_status", String(32)),
|
||
Column("is_repeat", Integer),
|
||
Column("sort", Integer),
|
||
Column("createtime", Integer),
|
||
Column("updatetime", Integer),
|
||
Column("batch", BigInteger),
|
||
Column("machine", Integer),
|
||
)
|
||
|
||
video = Table("sh_dm_video_v2", _meta,
|
||
Column("v_id", BigInteger, primary_key=True),
|
||
Column("v_xid", String(64)),
|
||
Column("rn", String(8)),
|
||
Column("v_name", String(255)),
|
||
Column("title", String(255)),
|
||
Column("link", Text),
|
||
Column("edition", String(64)),
|
||
Column("duration", Integer),
|
||
Column("public_time", String(32)),
|
||
Column("cover_pic", Text),
|
||
Column("sort", Integer),
|
||
Column("u_xid", String(64)),
|
||
Column("u_id", BigInteger),
|
||
Column("u_pic", Text),
|
||
Column("u_name", String(255)),
|
||
Column("status", Integer),
|
||
Column("createtime", Integer),
|
||
Column("updatetime", Integer),
|
||
|
||
Column("watch_number", Integer),
|
||
Column("follow_number", Integer),
|
||
Column("video_number", Integer),
|
||
)
|
||
|
||
|
||
def mysql_retry(max_retries: int = 3, base_delay: float = 2.0):
|
||
"""
|
||
装饰器工厂:捕获 InterfaceError 后断线重连并重试,
|
||
重试间隔按指数级增长:base_delay * 2**(attempt-1) 秒。
|
||
"""
|
||
|
||
def decorator(fn):
|
||
@functools.wraps(fn)
|
||
def wrapper(self, *args, **kwargs):
|
||
for attempt in range(1, max_retries + 1):
|
||
try:
|
||
# 确保连接存活,reconnect=True 会在 ping 失败时重连
|
||
self.conn.ping(reconnect=True)
|
||
return fn(self, *args, **kwargs)
|
||
except pymysql.InterfaceError as e:
|
||
wait = base_delay * (2 ** (attempt - 1))
|
||
logger.info(f"[MySQL][{fn.__name__}] 第{attempt}次 InterfaceError:{e},等待 {wait:.1f}s 后重连…")
|
||
time.sleep(wait)
|
||
self._reconnect_mysql()
|
||
if attempt == max_retries:
|
||
logger.info("[MySQL] 重试多次仍失败,抛出异常")
|
||
raise
|
||
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
def redis_retry(max_retries: int = 3):
|
||
"""
|
||
装饰器工厂:指定最大重试次数。
|
||
"""
|
||
|
||
def decorator(fn):
|
||
@functools.wraps(fn)
|
||
def wrapper(self, *args, **kwargs):
|
||
for attempt in range(1, max_retries + 1):
|
||
try:
|
||
return fn(self, *args, **kwargs)
|
||
except (ConnectionError, TimeoutError) as e:
|
||
logger.info(f"[Redis][{fn.__name__}] 第 {attempt} 次失败:{e}")
|
||
self.reconnect_redis()
|
||
if attempt == max_retries:
|
||
logger.info("[Redis] 连接彻底失败")
|
||
raise
|
||
logger.info(f"[Redis] 重连后第 {attempt + 1} 次重试…")
|
||
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
class DBVidcon:
|
||
_MYSQL_CONF = {
|
||
"host": "192.144.230.75", # "127.0.0.1", #
|
||
"port": 3306, # 3307, #
|
||
"user": "db_vidcon",
|
||
"password": "rexdK4fhCCiRE4BZ",
|
||
"database": "db_vidcon",
|
||
"charset": "utf8mb4",
|
||
"cursorclass": pymysql.cursors.DictCursor,
|
||
}
|
||
_REDIS_CONF = {
|
||
"host": "192.144.230.75", # "127.0.0.1", #
|
||
"port": 6379, # 6380, #
|
||
"password": "qwert@$123!&",
|
||
"decode_responses": True,
|
||
}
|
||
|
||
def __init__(self):
|
||
self.l0_list_key = "video_l0_queue"
|
||
self.l1_list_key = "video_l1_queue"
|
||
self.l2_list_key = "video_l2_queue"
|
||
self.error_list_key = "error_save_queue"
|
||
self.conn = pymysql.connect(**self._MYSQL_CONF)
|
||
self.cursor = self.conn.cursor()
|
||
self.redis = redis.Redis(**self._REDIS_CONF)
|
||
|
||
def _connect_redis(self):
|
||
"""初始化或重建 Redis 客户端"""
|
||
self.redis = redis.Redis(**self._REDIS_CONF)
|
||
|
||
def reconnect_redis(self):
|
||
"""当捕获到 ConnectionError 时,重连 Redis"""
|
||
try:
|
||
self._connect_redis()
|
||
except Exception as e:
|
||
logger.info("[Redis reconnect error]", e)
|
||
time.sleep(2)
|
||
|
||
@redis_retry(max_retries=3)
|
||
def push_record(self, data: dict):
|
||
raw = json.dumps(data, ensure_ascii=False)
|
||
self.redis.lpush(self.error_list_key, raw)
|
||
|
||
@redis_retry(max_retries=3)
|
||
def fetch_from_redis(self, count: int = 100, list_key: str = None):
|
||
key = list_key
|
||
try:
|
||
raws = self.redis.lpop(key, count)
|
||
except TypeError:
|
||
raws = []
|
||
for _ in range(count):
|
||
item = self.redis.rpop(key)
|
||
if item is None:
|
||
break
|
||
raws.append(item)
|
||
except redis.exceptions.ConnectionError as e:
|
||
logger.info("[Redis pop error]", e)
|
||
self.reconnect_redis()
|
||
return []
|
||
if not raws:
|
||
return []
|
||
if isinstance(raws, str):
|
||
raws = [raws]
|
||
out = []
|
||
for raw in raws:
|
||
try:
|
||
out.append((raw, json.loads(raw)))
|
||
except json.JSONDecodeError:
|
||
continue
|
||
return out
|
||
|
||
@redis_retry(max_retries=3)
|
||
def push_l0(self, raws):
|
||
"""向 l0(加急)队列写入数据"""
|
||
if isinstance(raws, str):
|
||
raws = [raws]
|
||
self.redis.lpush(self.l0_list_key, *raws)
|
||
logger.info(f"[写入l0] 已推入 {len(raws)} 条")
|
||
|
||
@redis_retry(max_retries=3)
|
||
def push_l1(self, payloads):
|
||
"""向 l1(普通)队列写入数据"""
|
||
if isinstance(payloads, str):
|
||
payloads = [payloads]
|
||
self.redis.rpush(self.l1_list_key, *payloads)
|
||
logger.info(f"[写入l1] 已推入 {len(payloads)} 条")
|
||
|
||
@redis_retry(max_retries=3)
|
||
def push_l2(self, raws):
|
||
"""向 l2(低优先)队列写入数据"""
|
||
if isinstance(raws, str):
|
||
raws = [raws]
|
||
self.redis.lpush(self.l2_list_key, *raws)
|
||
logger.info(f"[写入l2] 已推入 {len(raws)} 条")
|
||
|
||
@redis_retry(max_retries=3)
|
||
def get_proxy_agent_dict(self) -> dict:
|
||
try:
|
||
meta_json = self.redis.hget("proxy_config", "meta")
|
||
except Exception as e:
|
||
logger.info("[Redis get_proxy_parameter error]", exc_info=e)
|
||
self.reconnect_redis()
|
||
return {}
|
||
|
||
if not meta_json:
|
||
return {}
|
||
|
||
try:
|
||
return json.loads(meta_json)
|
||
except (json.JSONDecodeError, TypeError) as e:
|
||
logger.warning("[Proxy meta JSON decode error]", exc_info=e)
|
||
return {}
|
||
|
||
@mysql_retry()
|
||
def get_proxy_parameter(self, rn: str) -> str:
|
||
sql = "SELECT parameter FROM proxy_agent WHERE rn = %s LIMIT 1"
|
||
self.cursor.execute(sql, (rn,))
|
||
result = self.cursor.fetchone()
|
||
logger.info(result)
|
||
return result['parameter'] if result else None
|
||
|
||
@redis_retry(max_retries=3)
|
||
def item_keyword(self, count: int = 20):
|
||
try:
|
||
items = self.fetch_from_redis(count, list_key=self.l0_list_key)
|
||
except Exception as e:
|
||
logger.info("[Redis l0 pop error]", e)
|
||
self.reconnect_redis()
|
||
items = []
|
||
|
||
if items:
|
||
return items, 0
|
||
try:
|
||
items = self.fetch_from_redis(count, list_key=self.l1_list_key)
|
||
except Exception as e:
|
||
logger.info("[Redis l1 pop error]", e)
|
||
self.reconnect_redis()
|
||
items = []
|
||
|
||
if items:
|
||
return items, 1
|
||
try:
|
||
items = self.fetch_from_redis(count, list_key=self.l2_list_key)
|
||
except Exception as e:
|
||
logger.info("[Redis l2 pop error]", e)
|
||
self.reconnect_redis()
|
||
items = []
|
||
return items, 99
|
||
|
||
@redis_retry(max_retries=3)
|
||
def rollback_l1(self, payloads):
|
||
if isinstance(payloads, str):
|
||
payloads = [payloads]
|
||
self.redis.rpush(self.l1_list_key, *payloads)
|
||
logger.info(f"[回滚l1] 已退回 {len(payloads)} 条")
|
||
|
||
@redis_retry(max_retries=3)
|
||
def rollback_l0(self, raws):
|
||
if isinstance(raws, str):
|
||
raws = [raws]
|
||
self.redis.lpush(self.l0_list_key, *raws)
|
||
logger.info(f"[回滚l0] 已退回 {len(raws)} 条")
|
||
|
||
@redis_retry(max_retries=3)
|
||
def rollback_l2(self, raws):
|
||
if isinstance(raws, str):
|
||
raws = [raws]
|
||
self.redis.lpush(self.l2_list_key, *raws)
|
||
logger.info(f"[回滚l2] 已退回 {len(raws)} 条")
|
||
|
||
@mysql_retry()
|
||
def upsert_video(self, data: dict):
|
||
logger.info(fr"DB处理->{data.get('v_xid')},\tlevel->{data.get('level')}")
|
||
data.setdefault("a_id", 0)
|
||
data.setdefault("history_status", "")
|
||
data.setdefault("is_piracy", 3)
|
||
data.setdefault("is_repeat", 3)
|
||
data["sort"] = data.get("index", 0)
|
||
|
||
max_retries = 1 # 除了第一次外,再重试一次
|
||
attempt = 0
|
||
|
||
while True:
|
||
try:
|
||
sql_op = """
|
||
INSERT INTO sh_dm_video_op_v2 (
|
||
v_id, v_xid, a_id, level, name_title,
|
||
keyword, rn, history_status, is_repeat,
|
||
sort, createtime, updatetime, batch, machine
|
||
) VALUES (
|
||
%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s,
|
||
%(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s,
|
||
%(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s
|
||
)
|
||
"""
|
||
self.cursor.execute(sql_op, data)
|
||
sql_update = """
|
||
INSERT INTO sh_dm_video_v2 (
|
||
v_id, v_xid, rn, v_name, title, link,
|
||
edition, duration,
|
||
public_time, cover_pic, sort,
|
||
u_xid, u_id, u_pic, u_name,
|
||
status, createtime, updatetime
|
||
) VALUES (
|
||
%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s,
|
||
'', %(duration)s,
|
||
%(create_time)s, %(cover_pic)s, %(sort)s,
|
||
%(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s,
|
||
1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP()
|
||
)
|
||
ON DUPLICATE KEY UPDATE
|
||
title = VALUES(title),
|
||
duration = VALUES(duration),
|
||
cover_pic = VALUES(cover_pic),
|
||
sort = VALUES(sort),
|
||
updatetime = UNIX_TIMESTAMP();
|
||
"""
|
||
|
||
self.cursor.execute(sql_update, data)
|
||
break # 成功跳出重试循环
|
||
|
||
except Exception as e:
|
||
|
||
# 回滚这次未提交的改动
|
||
self.conn.rollback()
|
||
logger.info("[数据库写入异常]", str(e))
|
||
logger.info("[出错数据]:", data)
|
||
|
||
if attempt < max_retries:
|
||
attempt += 1
|
||
logger.info(f"第 {attempt + 1} 次重试…")
|
||
continue
|
||
else:
|
||
# 重试过后依然失败,推入 Redis 备用
|
||
logger.info("重试失败,将数据写入 Redis 以便后续处理")
|
||
self.push_record(data)
|
||
logger.info("[交由Redis处理]")
|
||
break
|
||
|
||
@mysql_retry()
|
||
def fetch_keyword_title(self, level: int = 99):
|
||
sql = """
|
||
SELECT
|
||
k.keyword,
|
||
k.rn,
|
||
t.title AS v_name,
|
||
ANY_VALUE(t.level) AS level
|
||
FROM
|
||
sh_dm_keyword k
|
||
LEFT JOIN
|
||
sh_dm_title t ON k.title = t.title
|
||
WHERE
|
||
k.status = 1
|
||
AND t.status = 1
|
||
AND NOT EXISTS (
|
||
SELECT 1 FROM sh_dm_black_keyword b WHERE b.title = t.title
|
||
)
|
||
AND t.level = %s
|
||
GROUP BY k.keyword, k.rn
|
||
"""
|
||
self.cursor.execute(sql, (level,))
|
||
return self.cursor.fetchall()
|
||
|
||
@mysql_retry()
|
||
def log_batch_start(self, info: Dict) -> int or None:
|
||
batch = info.get("batch")
|
||
level = info.get("level")
|
||
if batch is None or level is None:
|
||
raise ValueError("info 字典必须包含 'batch' 和 'level'")
|
||
count = info.get("count", 0)
|
||
if level == 0:
|
||
t0, t1, t2 = count, 0, 0
|
||
elif level == 1:
|
||
t0, t1, t2 = 0, count, 0
|
||
elif level == 9:
|
||
level = 2
|
||
t0, t1, t2 = 0, 0, count
|
||
|
||
start_ts = int(time.time())
|
||
sql = """
|
||
INSERT INTO sh_dm_batch_log
|
||
(batch, info, t0, t1, t2, starttime)
|
||
VALUES (%s, %s, %s, %s, %s, %s)
|
||
"""
|
||
try:
|
||
self.cursor.execute(sql, (batch, level, t0, t1, t2, start_ts))
|
||
self.conn.commit()
|
||
return self.cursor.lastrowid
|
||
except Exception as e:
|
||
logger.info(f"[log_batch_start] 插入失败:{e}")
|
||
return None
|
||
|
||
@mysql_retry()
|
||
def flush(self):
|
||
"""批量执行完后手动提交。"""
|
||
self.conn.commit()
|
||
|
||
def close(self):
|
||
try:
|
||
if self.cursor:
|
||
self.cursor.close()
|
||
finally:
|
||
if self.conn:
|
||
self.conn.close()
|
||
try:
|
||
if hasattr(self, "redis") and self.redis:
|
||
# redis-py ≥ 4.2 推荐直接调用 .close()
|
||
if hasattr(self.redis, "close"):
|
||
self.redis.close()
|
||
else:
|
||
self.redis.connection_pool.disconnect()
|
||
except Exception as e:
|
||
logger.info("[Redis close error]", e)
|
||
|
||
@redis_retry(max_retries=3)
|
||
def get_proxy(self, region_code: str) -> str:
|
||
region_code = region_code.upper()
|
||
list_json_str = self.redis.hget("proxy_config", "list") or "[]"
|
||
conf_str = self.redis.hget("proxy_config", "conf") or "0"
|
||
|
||
try:
|
||
proxies = json.loads(list_json_str)
|
||
idx = int(conf_str)
|
||
entry = proxies[idx]
|
||
result = entry.get("data", {}).get(region_code, "")
|
||
if not result.startswith("http://"):
|
||
result = "http://" + result
|
||
return result
|
||
except (ValueError, IndexError, KeyError, json.JSONDecodeError):
|
||
return ""
|
||
|
||
@redis_retry(max_retries=3)
|
||
def queues_empty(self) -> bool:
|
||
"""
|
||
如果都空,返回 True;只要有一个不空,就返回 False。
|
||
"""
|
||
return (
|
||
self.redis.llen(self.l0_list_key) == 0
|
||
and self.redis.llen(self.l1_list_key) == 0
|
||
and self.redis.llen(self.l2_list_key) == 0
|
||
)
|
||
|
||
@redis_retry()
|
||
def l0_empty(self) -> bool:
|
||
return self.redis.llen(self.l0_list_key) == 0
|
||
|
||
@redis_retry()
|
||
def l1_empty(self) -> bool:
|
||
return self.redis.llen(self.l1_list_key) == 0
|
||
|
||
@redis_retry()
|
||
def l2_empty(self) -> bool:
|
||
return self.redis.llen(self.l2_list_key) == 0
|
||
|
||
@redis_retry(max_retries=3)
|
||
def pop_error_item(self):
|
||
"""
|
||
从 error_list_key 中弹出一个错误记录(lpop),
|
||
如果队列为空,返回 None。
|
||
"""
|
||
item = self.redis.lpop(self.error_list_key)
|
||
# 如果你存入的是 JSON 字符串,可以在这里做一次反序列化:
|
||
return json.loads(item) if item is not None else None
|
||
|
||
|
||
class DBSA:
|
||
FLUSH_EVERY_ROWS = 100 # 行阈值
|
||
FLUSH_INTERVAL = 3 # 秒阈值
|
||
|
||
_buf_op: List[Dict] = []
|
||
_buf_vid: List[Dict] = []
|
||
_buf_payload: List[Dict] = []
|
||
_last_flush: float = time.time()
|
||
_lock = threading.Lock()
|
||
|
||
push_record_many = staticmethod(
|
||
lambda rows: logger.info("[退回Redis] cnt=", len(rows))
|
||
)
|
||
|
||
@classmethod
|
||
def upsert_video(cls, data):
|
||
data = copy.deepcopy(data)
|
||
data.setdefault("a_id", 0)
|
||
data.setdefault("history_status", "")
|
||
data.setdefault("is_repeat", 3) # 避免 KeyError
|
||
data["sort"] = data.get("index", 0)
|
||
|
||
now_ts = int(time.time())
|
||
op_row = {
|
||
"v_id": data["v_id"], "v_xid": data["v_xid"], "a_id": data["a_id"],
|
||
"level": data["level"], "name_title": data["v_name"],
|
||
"keyword": data["keyword"], "rn": data["rn"],
|
||
"history_status": data["history_status"], "is_repeat": data["is_repeat"],
|
||
"sort": data["sort"], "createtime": now_ts, "updatetime": now_ts,
|
||
"batch": data["batch"], "machine": data["machine_id"],
|
||
}
|
||
vid_row = {
|
||
"v_id": data["v_id"], "v_xid": data["v_xid"], "rn": data["rn"],
|
||
"v_name": data["v_name"], "title": data["title"], "link": data["link"],
|
||
"edition": "", "duration": data["duration"],
|
||
"public_time": data["create_time"], "cover_pic": data["cover_pic"],
|
||
"sort": data["sort"], "u_xid": data["u_xid"], "u_id": data["u_id"],
|
||
"u_pic": data["u_pic"], "u_name": data["u_name"],
|
||
"status": 1, "createtime": now_ts, "updatetime": now_ts, "watch_number": data.get("view", 0),
|
||
"follow_number": data.get("fans", 0),
|
||
"video_number": data.get("videos", 0),
|
||
}
|
||
|
||
with cls._lock:
|
||
cls._buf_op.append(op_row)
|
||
cls._buf_vid.append(vid_row)
|
||
cls._buf_payload.append(data) # 保存原始
|
||
buf_len = len(cls._buf_vid)
|
||
logger.info(f"DB缓冲 -> xid={data['v_xid']}, level={data['level']}, buffer={buf_len}")
|
||
|
||
need_flush = False
|
||
flush_reason = ""
|
||
if buf_len >= cls.FLUSH_EVERY_ROWS:
|
||
need_flush = True
|
||
flush_reason = "ROWS"
|
||
elif time.time() - cls._last_flush >= cls.FLUSH_INTERVAL:
|
||
need_flush = True
|
||
flush_reason = "TIME"
|
||
|
||
if need_flush:
|
||
logger.info(f"DBSA 落 ({flush_reason}) ...")
|
||
cls.flush()
|
||
|
||
@classmethod
|
||
def _bulk_insert(cls, rows: List[Dict]):
|
||
if not rows:
|
||
return
|
||
stmt = video_op.insert().values(rows)
|
||
with _engine.begin() as conn:
|
||
conn.execute(stmt)
|
||
|
||
@classmethod
|
||
def _bulk_upsert(cls, rows: List[Dict]):
|
||
if not rows:
|
||
return
|
||
stmt = mysql_insert(video).values(rows)
|
||
upd = {
|
||
"title": stmt.inserted.title,
|
||
"duration": stmt.inserted.duration,
|
||
"cover_pic": stmt.inserted.cover_pic,
|
||
"sort": stmt.inserted.sort,
|
||
"updatetime": stmt.inserted.updatetime,
|
||
}
|
||
ondup = stmt.on_duplicate_key_update(**upd)
|
||
with _engine.begin() as conn:
|
||
conn.execute(ondup)
|
||
|
||
@classmethod
|
||
def flush(cls):
|
||
with cls._lock:
|
||
op_rows = cls._buf_op[:]
|
||
vid_rows = cls._buf_vid[:]
|
||
payloads = cls._buf_payload[:]
|
||
cls._buf_op.clear()
|
||
cls._buf_vid.clear()
|
||
cls._buf_payload.clear()
|
||
cls._last_flush = time.time()
|
||
|
||
if not op_rows and not vid_rows:
|
||
return
|
||
|
||
for r in vid_rows:
|
||
r.pop("is_repeat", None)
|
||
r.pop("level", None)
|
||
|
||
start = time.time()
|
||
try:
|
||
cls._bulk_insert(op_rows)
|
||
cls._bulk_upsert(vid_rows)
|
||
logger.info(f"[DBSA] 成 op={len(op_rows)} video={len(vid_rows)} time={time.time() - start:.3f}s")
|
||
|
||
except Exception as e:
|
||
logger.info(f"[DBSA] flush FAIL: {e} op={len(op_rows)} video={len(vid_rows)}")
|
||
# 批量退回原始 payload,字段最全
|
||
try:
|
||
cls.push_record_many(payloads)
|
||
except Exception as re:
|
||
logger.info("[Redis 回退失败]", re)
|
||
|
||
@classmethod
|
||
def update_video_stats(cls, locator: dict, stats: dict) -> int:
|
||
v_xid = locator.get("v_xid")
|
||
rn = locator.get("rn")
|
||
if not v_xid or not rn:
|
||
raise ValueError("locator 必须包含 'v_xid' 和 'rn'")
|
||
|
||
params = dict(stats)
|
||
params["updatetime"] = int(time.time())
|
||
# 过滤只保留 video 表中存在的列
|
||
valid_cols = set(video.c.keys())
|
||
filtered_params = {k: v for k, v in params.items() if k in valid_cols}
|
||
|
||
stmt = (
|
||
video
|
||
.update()
|
||
.where(video.c.v_xid == v_xid, video.c.rn == rn)
|
||
.values(**filtered_params)
|
||
)
|
||
with _engine.begin() as conn:
|
||
result = conn.execute(stmt)
|
||
return result.rowcount
|
||
|
||
@classmethod
|
||
def update_video_stats_async(cls, locator: dict, stats: dict) -> None:
|
||
"""
|
||
异步更新 sh_dm_video_v2 表中的统计字段,立即返回,不阻塞调用线程。
|
||
"""
|
||
thread = threading.Thread(
|
||
target=cls.update_video_stats,
|
||
args=(locator, stats),
|
||
daemon=True
|
||
)
|
||
thread.start()
|