DailyMotion/DB.py

631 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import redis
import pymysql
import time
import functools
from redis.exceptions import ConnectionError, TimeoutError
import time, copy, threading
from typing import List, Dict
from sqlalchemy import (
create_engine, MetaData, Table, Column,
BigInteger, Integer, String, Text
)
from sqlalchemy.dialects.mysql import insert as mysql_insert
from logger import logger
MYSQL_URL = (
"mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ"
"@192.144.230.75:3306/db_vidcon?charset=utf8mb4"
)
_engine = create_engine(
MYSQL_URL,
pool_size=20, max_overflow=10,
pool_pre_ping=True, pool_recycle=3600,
future=True,
)
_meta = MetaData()
video_op = Table("sh_dm_video_op_v2", _meta,
Column("v_id", BigInteger, primary_key=True),
Column("v_xid", String(64)),
Column("a_id", Integer),
Column("level", Integer),
Column("name_title", String(255)),
Column("keyword", String(255)),
Column("rn", String(8)),
Column("history_status", String(32)),
Column("is_repeat", Integer),
Column("sort", Integer),
Column("createtime", Integer),
Column("updatetime", Integer),
Column("batch", BigInteger),
Column("machine", Integer),
)
video = Table("sh_dm_video_v2", _meta,
Column("v_id", BigInteger, primary_key=True),
Column("v_xid", String(64)),
Column("rn", String(8)),
Column("v_name", String(255)),
Column("title", String(255)),
Column("link", Text),
Column("edition", String(64)),
Column("duration", Integer),
Column("public_time", String(32)),
Column("cover_pic", Text),
Column("sort", Integer),
Column("u_xid", String(64)),
Column("u_id", BigInteger),
Column("u_pic", Text),
Column("u_name", String(255)),
Column("status", Integer),
Column("createtime", Integer),
Column("updatetime", Integer),
)
def mysql_retry(max_retries: int = 3, base_delay: float = 2.0):
"""
装饰器工厂:捕获 InterfaceError 后断线重连并重试,
重试间隔按指数级增长base_delay * 2**(attempt-1) 秒。
"""
def decorator(fn):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
for attempt in range(1, max_retries + 1):
try:
# 确保连接存活reconnect=True 会在 ping 失败时重连
self.conn.ping(reconnect=True)
return fn(self, *args, **kwargs)
except pymysql.InterfaceError as e:
wait = base_delay * (2 ** (attempt - 1))
logger.info(f"[MySQL][{fn.__name__}] 第{attempt}次 InterfaceError{e},等待 {wait:.1f}s 后重连…")
time.sleep(wait)
self._reconnect_mysql()
if attempt == max_retries:
logger.info("[MySQL] 重试多次仍失败,抛出异常")
raise
return wrapper
return decorator
def redis_retry(max_retries: int = 3):
"""
装饰器工厂:指定最大重试次数。
"""
def decorator(fn):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
for attempt in range(1, max_retries + 1):
try:
return fn(self, *args, **kwargs)
except (ConnectionError, TimeoutError) as e:
logger.info(f"[Redis][{fn.__name__}] 第 {attempt} 次失败:{e}")
self.reconnect_redis()
if attempt == max_retries:
logger.info("[Redis] 连接彻底失败")
raise
logger.info(f"[Redis] 重连后第 {attempt + 1} 次重试…")
return wrapper
return decorator
class DBVidcon:
_MYSQL_CONF = {
"host": "192.144.230.75", # "127.0.0.1", #
"port": 3306, # 3307, #
"user": "db_vidcon",
"password": "rexdK4fhCCiRE4BZ",
"database": "db_vidcon",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor,
}
_REDIS_CONF = {
"host": "192.144.230.75", # "127.0.0.1", #
"port": 6379, # 6380, #
"password": "qwert@$123!&",
"decode_responses": True,
}
def __init__(self):
self.l0_list_key = "video_l0_queue"
self.l1_list_key = "video_l1_queue"
self.l2_list_key = "video_l2_queue"
self.error_list_key = "error_save_queue"
self.conn = pymysql.connect(**self._MYSQL_CONF)
self.cursor = self.conn.cursor()
self.redis = redis.Redis(**self._REDIS_CONF)
def _connect_redis(self):
"""初始化或重建 Redis 客户端"""
self.redis = redis.Redis(**self._REDIS_CONF)
def reconnect_redis(self):
"""当捕获到 ConnectionError 时,重连 Redis"""
try:
self._connect_redis()
except Exception as e:
logger.info("[Redis reconnect error]", e)
time.sleep(2)
@redis_retry(max_retries=3)
def push_record(self, data: dict):
raw = json.dumps(data, ensure_ascii=False)
self.redis.lpush(self.error_list_key, raw)
@redis_retry(max_retries=3)
def fetch_from_redis(self, count: int = 100, list_key: str = None):
key = list_key
try:
raws = self.redis.lpop(key, count)
except TypeError:
raws = []
for _ in range(count):
item = self.redis.rpop(key)
if item is None:
break
raws.append(item)
except redis.exceptions.ConnectionError as e:
logger.info("[Redis pop error]", e)
self.reconnect_redis()
return []
if not raws:
return []
if isinstance(raws, str):
raws = [raws]
out = []
for raw in raws:
try:
out.append((raw, json.loads(raw)))
except json.JSONDecodeError:
continue
return out
@redis_retry(max_retries=3)
def push_l0(self, raws):
"""向 l0加急队列写入数据"""
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.l0_list_key, *raws)
logger.info(f"[写入l0] 已推入 {len(raws)}")
@redis_retry(max_retries=3)
def push_l1(self, payloads):
"""向 l1普通队列写入数据"""
if isinstance(payloads, str):
payloads = [payloads]
self.redis.rpush(self.l1_list_key, *payloads)
logger.info(f"[写入l1] 已推入 {len(payloads)}")
@redis_retry(max_retries=3)
def push_l2(self, raws):
"""向 l2低优先队列写入数据"""
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.l2_list_key, *raws)
logger.info(f"[写入l2] 已推入 {len(raws)}")
@mysql_retry()
def get_proxy_agent_dict(self) -> dict:
sql = "SELECT rn, parameter FROM proxy_agent"
self.cursor.execute(sql)
rows = self.cursor.fetchall()
result = {row['rn']: row['parameter'] for row in rows}
return result
@mysql_retry()
def get_proxy_parameter(self, rn: str) -> str:
sql = "SELECT parameter FROM proxy_agent WHERE rn = %s LIMIT 1"
self.cursor.execute(sql, (rn,))
result = self.cursor.fetchone()
logger.info(result)
return result['parameter'] if result else None
@redis_retry(max_retries=3)
def item_keyword(self, count: int = 20):
try:
items = self.fetch_from_redis(count, list_key=self.l0_list_key)
except Exception as e:
logger.info("[Redis l0 pop error]", e)
self.reconnect_redis()
items = []
if items:
return items, 0
try:
items = self.fetch_from_redis(count, list_key=self.l1_list_key)
except Exception as e:
logger.info("[Redis l1 pop error]", e)
self.reconnect_redis()
items = []
if items:
return items, 1
try:
items = self.fetch_from_redis(count, list_key=self.l2_list_key)
except Exception as e:
logger.info("[Redis l2 pop error]", e)
self.reconnect_redis()
items = []
return items, 99
@redis_retry(max_retries=3)
def rollback_l1(self, payloads):
if isinstance(payloads, str):
payloads = [payloads]
self.redis.rpush(self.l1_list_key, *payloads)
logger.info(f"[回滚l1] 已退回 {len(payloads)}")
@redis_retry(max_retries=3)
def rollback_l0(self, raws):
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.l0_list_key, *raws)
logger.info(f"[回滚l0] 已退回 {len(raws)}")
@redis_retry(max_retries=3)
def rollback_l2(self, raws):
if isinstance(raws, str):
raws = [raws]
self.redis.lpush(self.l2_list_key, *raws)
logger.info(f"[回滚l2] 已退回 {len(raws)}")
@mysql_retry()
def upsert_video(self, data: dict):
logger.info(fr"DB处理->{data.get('v_xid')},\tlevel->{data.get('level')}")
data.setdefault("a_id", 0)
data.setdefault("history_status", "")
data.setdefault("is_piracy", 3)
data.setdefault("is_repeat", 3)
data["sort"] = data.get("index", 0)
max_retries = 1 # 除了第一次外,再重试一次
attempt = 0
while True:
try:
sql_op = """
INSERT INTO sh_dm_video_op_v2 (
v_id, v_xid, a_id, level, name_title,
keyword, rn, history_status, is_repeat,
sort, createtime, updatetime, batch, machine
) VALUES (
%(v_id)s, %(v_xid)s, %(a_id)s, %(level)s, %(v_name)s,
%(keyword)s, %(rn)s, %(history_status)s, %(is_repeat)s,
%(sort)s, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), %(batch)s, %(machine_id)s
)
"""
self.cursor.execute(sql_op, data)
sql_update = """
INSERT INTO sh_dm_video_v2 (
v_id, v_xid, rn, v_name, title, link,
edition, duration,
public_time, cover_pic, sort,
u_xid, u_id, u_pic, u_name,
status, createtime, updatetime
) VALUES (
%(v_id)s, %(v_xid)s, %(rn)s, %(v_name)s, %(title)s, %(link)s,
'', %(duration)s,
%(create_time)s, %(cover_pic)s, %(sort)s,
%(u_xid)s, %(u_id)s, %(u_pic)s, %(u_name)s,
1, UNIX_TIMESTAMP(), UNIX_TIMESTAMP()
)
ON DUPLICATE KEY UPDATE
title = VALUES(title),
duration = VALUES(duration),
cover_pic = VALUES(cover_pic),
sort = VALUES(sort),
updatetime = UNIX_TIMESTAMP();
"""
self.cursor.execute(sql_update, data)
break # 成功跳出重试循环
except Exception as e:
# 回滚这次未提交的改动
self.conn.rollback()
logger.info("[数据库写入异常]", str(e))
logger.info("[出错数据]:", data)
if attempt < max_retries:
attempt += 1
logger.info(f"{attempt + 1} 次重试…")
continue
else:
# 重试过后依然失败,推入 Redis 备用
logger.info("重试失败,将数据写入 Redis 以便后续处理")
self.push_record(data)
logger.info("[交由Redis处理]")
break
@mysql_retry()
def fetch_keyword_title(self, level: int = 99):
sql = """
SELECT
k.keyword,
k.rn,
t.title AS v_name,
ANY_VALUE(t.level) AS level
FROM
sh_dm_keyword k
LEFT JOIN
sh_dm_title t ON k.title = t.title
WHERE
k.status = 1
AND t.status = 1
AND NOT EXISTS (
SELECT 1 FROM sh_dm_black_keyword b WHERE b.title = t.title
)
AND t.level = %s
GROUP BY k.keyword, k.rn
"""
self.cursor.execute(sql, (level,))
return self.cursor.fetchall()
@mysql_retry()
def log_batch_start(self, info: Dict) -> int or None:
batch = info.get("batch")
level = info.get("level")
if batch is None or level is None:
raise ValueError("info 字典必须包含 'batch''level'")
count = info.get("count", 0)
if level == 0:
t0, t1, t2 = count, 0, 0
elif level == 1:
t0, t1, t2 = 0, count, 0
elif level == 9:
level = 2
t0, t1, t2 = 0, 0, count
start_ts = int(time.time())
sql = """
INSERT INTO sh_dm_batch_log
(batch, info, t0, t1, t2, starttime)
VALUES (%s, %s, %s, %s, %s, %s)
"""
try:
self.cursor.execute(sql, (batch, level, t0, t1, t2, start_ts))
self.conn.commit()
return self.cursor.lastrowid
except Exception as e:
logger.info(f"[log_batch_start] 插入失败:{e}")
return None
@mysql_retry()
def flush(self):
"""批量执行完后手动提交。"""
self.conn.commit()
def close(self):
try:
if self.cursor:
self.cursor.close()
finally:
if self.conn:
self.conn.close()
try:
if hasattr(self, "redis") and self.redis:
# redis-py ≥ 4.2 推荐直接调用 .close()
if hasattr(self.redis, "close"):
self.redis.close()
else:
self.redis.connection_pool.disconnect()
except Exception as e:
logger.info("[Redis close error]", e)
@redis_retry(max_retries=3)
def get_proxy(self, region_code: str) -> str:
"""
从 Redis 队列 proxy_queue:<region_code> 弹出一个代理并返回。
如果队列为空,阻塞
"""
proxy = ""
while True:
key = f"proxy_queue:{region_code}"
proxy = self.redis.lpop(key)
if proxy is None:
time.sleep(10)
else:
break
return proxy
@redis_retry(max_retries=3)
def queues_empty(self) -> bool:
"""
如果都空,返回 True只要有一个不空就返回 False。
"""
return (
self.redis.llen(self.l0_list_key) == 0
and self.redis.llen(self.l1_list_key) == 0
and self.redis.llen(self.l2_list_key) == 0
)
@redis_retry()
def l0_empty(self) -> bool:
return self.redis.llen(self.l0_list_key) == 0
@redis_retry()
def l1_empty(self) -> bool:
return self.redis.llen(self.l1_list_key) == 0
@redis_retry()
def l2_empty(self) -> bool:
return self.redis.llen(self.l2_list_key) == 0
@redis_retry(max_retries=3)
def pop_error_item(self):
"""
从 error_list_key 中弹出一个错误记录lpop
如果队列为空,返回 None。
"""
item = self.redis.lpop(self.error_list_key)
# 如果你存入的是 JSON 字符串,可以在这里做一次反序列化:
return json.loads(item) if item is not None else None
class DBSA:
FLUSH_EVERY_ROWS = 100 # 行阈值
FLUSH_INTERVAL = 3 # 秒阈值
_buf_op: List[Dict] = []
_buf_vid: List[Dict] = []
_buf_payload: List[Dict] = []
_last_flush: float = time.time()
_lock = threading.Lock()
push_record_many = staticmethod(
lambda rows: logger.info("[退回Redis] cnt=", len(rows))
)
@classmethod
def upsert_video(cls, data):
data = copy.deepcopy(data)
data.setdefault("a_id", 0)
data.setdefault("history_status", "")
data.setdefault("is_repeat", 3) # 避免 KeyError
data["sort"] = data.get("index", 0)
now_ts = int(time.time())
op_row = {
"v_id": data["v_id"], "v_xid": data["v_xid"], "a_id": data["a_id"],
"level": data["level"], "name_title": data["v_name"],
"keyword": data["keyword"], "rn": data["rn"],
"history_status": data["history_status"], "is_repeat": data["is_repeat"],
"sort": data["sort"], "createtime": now_ts, "updatetime": now_ts,
"batch": data["batch"], "machine": data["machine_id"],
}
vid_row = {
"v_id": data["v_id"], "v_xid": data["v_xid"], "rn": data["rn"],
"v_name": data["v_name"], "title": data["title"], "link": data["link"],
"edition": "", "duration": data["duration"],
"public_time": data["create_time"], "cover_pic": data["cover_pic"],
"sort": data["sort"], "u_xid": data["u_xid"], "u_id": data["u_id"],
"u_pic": data["u_pic"], "u_name": data["u_name"],
"status": 1, "createtime": now_ts, "updatetime": now_ts,
}
with cls._lock:
cls._buf_op.append(op_row)
cls._buf_vid.append(vid_row)
cls._buf_payload.append(data) # 保存原始
buf_len = len(cls._buf_vid)
logger.info(f"DB缓冲 -> xid={data['v_xid']}, level={data['level']}, buffer={buf_len}")
need_flush = False
flush_reason = ""
if buf_len >= cls.FLUSH_EVERY_ROWS:
need_flush = True
flush_reason = "ROWS"
elif time.time() - cls._last_flush >= cls.FLUSH_INTERVAL:
need_flush = True
flush_reason = "TIME"
if need_flush:
logger.info(f"DBSA 落 ({flush_reason}) ...")
cls.flush()
@classmethod
def _bulk_insert(cls, rows: List[Dict]):
if not rows:
return
stmt = video_op.insert().values(rows)
with _engine.begin() as conn:
conn.execute(stmt)
@classmethod
def _bulk_upsert(cls, rows: List[Dict]):
if not rows:
return
stmt = mysql_insert(video).values(rows)
upd = {
"title": stmt.inserted.title,
"duration": stmt.inserted.duration,
"cover_pic": stmt.inserted.cover_pic,
"sort": stmt.inserted.sort,
"updatetime": stmt.inserted.updatetime,
}
ondup = stmt.on_duplicate_key_update(**upd)
with _engine.begin() as conn:
conn.execute(ondup)
@classmethod
def flush(cls):
with cls._lock:
op_rows = cls._buf_op[:]
vid_rows = cls._buf_vid[:]
payloads = cls._buf_payload[:]
cls._buf_op.clear()
cls._buf_vid.clear()
cls._buf_payload.clear()
cls._last_flush = time.time()
if not op_rows and not vid_rows:
return
for r in vid_rows:
r.pop("is_repeat", None)
r.pop("level", None)
start = time.time()
try:
cls._bulk_insert(op_rows)
cls._bulk_upsert(vid_rows)
logger.info(f"[DBSA] 成 op={len(op_rows)} video={len(vid_rows)} time={time.time() - start:.3f}s")
except Exception as e:
logger.info(f"[DBSA] flush FAIL: {e} op={len(op_rows)} video={len(vid_rows)}")
# 批量退回原始 payload字段最全
try:
cls.push_record_many(payloads)
except Exception as re:
logger.info("[Redis 回退失败]", re)
@classmethod
def update_video_stats(cls, locator:dict, stats:dict) -> int:
"""
立即更新 sh_dm_video_v2 表中的统计字段。n
:param locator: 用于定位行的字典,必须包含: v_xid, rn
:param stats: 需要更新的统计字段,如 {"fans": 633, "videos": 10090, "view": 1678408}
:return: 受影响的行数
"""
v_xid = locator.get("v_xid")
rn = locator.get("rn")
if not v_xid or not rn:
raise ValueError("locator 必须包含 'v_xid''rn'")
params = dict(stats)
params["updatetime"] = int(time.time())
# 过滤只保留 video 表中存在的列
valid_cols = set(video.c.keys())
filtered_params = {k: v for k, v in params.items() if k in valid_cols}
stmt = (
video
.update()
.where(video.c.v_xid == v_xid, video.c.rn == rn)
.values(**filtered_params)
)
with _engine.begin() as conn:
result = conn.execute(stmt)
return result.rowcount
@classmethod
def update_video_stats_async(cls, locator:dict, stats:dict) -> None:
"""
异步更新 sh_dm_video_v2 表中的统计字段,立即返回,不阻塞调用线程。
"""
thread = threading.Thread(
target=cls.update_video_stats,
args=(locator, stats),
daemon=True
)
thread.start()