feat: 添加视频操作和视频表结构,支持批量插入和更新功能
This commit is contained in:
parent
c33bffa392
commit
bcc4751328
159
DB.py
159
DB.py
@ -5,6 +5,67 @@ import time
|
|||||||
import functools
|
import functools
|
||||||
from redis.exceptions import ConnectionError, TimeoutError
|
from redis.exceptions import ConnectionError, TimeoutError
|
||||||
|
|
||||||
|
from sqlalchemy import (
|
||||||
|
create_engine, MetaData, Table, Column,
|
||||||
|
BigInteger, Integer, String, Text
|
||||||
|
)
|
||||||
|
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
||||||
|
from typing import List, Dict
|
||||||
|
import time, copy, queue
|
||||||
|
|
||||||
|
MYSQL_URL = (
|
||||||
|
"mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ"
|
||||||
|
"@192.144.230.75:3306/db_vidcon?charset=utf8mb4"
|
||||||
|
)
|
||||||
|
|
||||||
|
_engine = create_engine(
|
||||||
|
MYSQL_URL,
|
||||||
|
pool_size=20, max_overflow=10,
|
||||||
|
pool_pre_ping=True, pool_recycle=3600,
|
||||||
|
future=True,
|
||||||
|
)
|
||||||
|
_meta = MetaData()
|
||||||
|
|
||||||
|
video_op = Table(
|
||||||
|
"sh_dm_video_op_v2", _meta,
|
||||||
|
Column("v_id", BigInteger, primary_key=True),
|
||||||
|
Column("v_xid", String(64)),
|
||||||
|
Column("a_id", Integer),
|
||||||
|
Column("level", Integer),
|
||||||
|
Column("name_title", String(255)),
|
||||||
|
Column("keyword", String(255)),
|
||||||
|
Column("rn", String(8)),
|
||||||
|
Column("history_status", String(32)),
|
||||||
|
Column("is_repeat", Integer),
|
||||||
|
Column("sort", Integer),
|
||||||
|
Column("createtime", Integer),
|
||||||
|
Column("updatetime", Integer),
|
||||||
|
Column("batch", BigInteger),
|
||||||
|
Column("machine", Integer),
|
||||||
|
)
|
||||||
|
|
||||||
|
video = Table(
|
||||||
|
"sh_dm_video_v2", _meta,
|
||||||
|
Column("v_id", BigInteger, primary_key=True),
|
||||||
|
Column("v_xid", String(64)),
|
||||||
|
Column("rn", String(8)),
|
||||||
|
Column("v_name", String(255)),
|
||||||
|
Column("title", String(255)),
|
||||||
|
Column("link", Text),
|
||||||
|
Column("edition", String(64)),
|
||||||
|
Column("duration", Integer),
|
||||||
|
Column("public_time", String(32)),
|
||||||
|
Column("cover_pic", Text),
|
||||||
|
Column("sort", Integer),
|
||||||
|
Column("u_xid", String(64)),
|
||||||
|
Column("u_id", BigInteger),
|
||||||
|
Column("u_pic", Text),
|
||||||
|
Column("u_name", String(255)),
|
||||||
|
Column("status", Integer),
|
||||||
|
Column("createtime", Integer),
|
||||||
|
Column("updatetime", Integer),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def mysql_retry(max_retries: int = 3, base_delay: float = 2.0):
|
def mysql_retry(max_retries: int = 3, base_delay: float = 2.0):
|
||||||
"""
|
"""
|
||||||
@ -371,3 +432,101 @@ class DBVidcon:
|
|||||||
item = self.redis.lpop(self.error_list_key)
|
item = self.redis.lpop(self.error_list_key)
|
||||||
# 如果你存入的是 JSON 字符串,可以在这里做一次反序列化:
|
# 如果你存入的是 JSON 字符串,可以在这里做一次反序列化:
|
||||||
return json.loads(item) if item is not None else None
|
return json.loads(item) if item is not None else None
|
||||||
|
|
||||||
|
|
||||||
|
class DBSA:
|
||||||
|
BUFFER_SIZE = 1000 # 满 1000 条写库
|
||||||
|
_buf_op: List[Dict] = []
|
||||||
|
_buf_vid: List[Dict] = []
|
||||||
|
|
||||||
|
push_record = staticmethod(lambda d: print("[假装推 Redis]", d))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def upsert_video(cls, data: Dict):
|
||||||
|
data = copy.deepcopy(data)
|
||||||
|
data.setdefault("a_id", 0)
|
||||||
|
data.setdefault("history_status", "")
|
||||||
|
data.setdefault("is_repeat", 3)
|
||||||
|
data["sort"] = data.get("index", 0)
|
||||||
|
|
||||||
|
op_row = {
|
||||||
|
"v_id": data["v_id"],
|
||||||
|
"v_xid": data["v_xid"],
|
||||||
|
"a_id": data["a_id"],
|
||||||
|
"level": data["level"],
|
||||||
|
"name_title": data["v_name"],
|
||||||
|
"keyword": data["keyword"],
|
||||||
|
"rn": data["rn"],
|
||||||
|
"history_status": data["history_status"],
|
||||||
|
"is_repeat": data["is_repeat"],
|
||||||
|
"sort": data["sort"],
|
||||||
|
"createtime": int(time.time()),
|
||||||
|
"updatetime": int(time.time()),
|
||||||
|
"batch": data["batch"],
|
||||||
|
"machine": data["machine_id"],
|
||||||
|
}
|
||||||
|
cls._buf_op.append(op_row)
|
||||||
|
|
||||||
|
vid_row = {
|
||||||
|
"v_id": data["v_id"],
|
||||||
|
"v_xid": data["v_xid"],
|
||||||
|
"rn": data["rn"],
|
||||||
|
"v_name": data["v_name"],
|
||||||
|
"title": data["title"],
|
||||||
|
"link": data["link"],
|
||||||
|
"edition": "",
|
||||||
|
"duration": data["duration"],
|
||||||
|
"public_time": data["create_time"],
|
||||||
|
"cover_pic": data["cover_pic"],
|
||||||
|
"sort": data["sort"],
|
||||||
|
"u_xid": data["u_xid"],
|
||||||
|
"u_id": data["u_id"],
|
||||||
|
"u_pic": data["u_pic"],
|
||||||
|
"u_name": data["u_name"],
|
||||||
|
"status": 1,
|
||||||
|
"createtime": int(time.time()),
|
||||||
|
"updatetime": int(time.time()),
|
||||||
|
}
|
||||||
|
cls._buf_vid.append(vid_row)
|
||||||
|
|
||||||
|
if len(cls._buf_op) >= cls.BUFFER_SIZE:
|
||||||
|
cls.flush()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _bulk_insert_op(cls):
|
||||||
|
if not cls._buf_op:
|
||||||
|
return
|
||||||
|
stmt = video_op.insert().values(cls._buf_op)
|
||||||
|
with _engine.begin() as conn:
|
||||||
|
conn.execute(stmt)
|
||||||
|
cls._buf_op.clear()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _bulk_upsert_vid(cls):
|
||||||
|
if not cls._buf_vid:
|
||||||
|
return
|
||||||
|
stmt = mysql_insert(video).values(cls._buf_vid)
|
||||||
|
upd = {
|
||||||
|
"title": stmt.inserted.title,
|
||||||
|
"duration": stmt.inserted.duration,
|
||||||
|
"cover_pic": stmt.inserted.cover_pic,
|
||||||
|
"sort": stmt.inserted.sort,
|
||||||
|
"updatetime": stmt.inserted.updatetime,
|
||||||
|
}
|
||||||
|
ondup = stmt.on_duplicate_key_update(**upd)
|
||||||
|
with _engine.begin() as conn:
|
||||||
|
conn.execute(ondup)
|
||||||
|
cls._buf_vid.clear()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def flush(cls):
|
||||||
|
try:
|
||||||
|
cls._bulk_insert_op()
|
||||||
|
cls._bulk_upsert_vid()
|
||||||
|
except Exception as e:
|
||||||
|
print("[DBSA] 批写失败:", e)
|
||||||
|
# 将两段缓冲重推 Redis(与旧逻辑一致)
|
||||||
|
for row in cls._buf_vid:
|
||||||
|
cls.push_record(row)
|
||||||
|
cls._buf_op.clear()
|
||||||
|
cls._buf_vid.clear()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user