From bcc4751328d2c5305bc02a065eebed6b44ecb9f5 Mon Sep 17 00:00:00 2001 From: Franklin-F Date: Wed, 21 May 2025 00:41:44 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E6=93=8D=E4=BD=9C=E5=92=8C=E8=A7=86=E9=A2=91=E8=A1=A8=E7=BB=93?= =?UTF-8?q?=E6=9E=84=EF=BC=8C=E6=94=AF=E6=8C=81=E6=89=B9=E9=87=8F=E6=8F=92?= =?UTF-8?q?=E5=85=A5=E5=92=8C=E6=9B=B4=E6=96=B0=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- DB.py | 159 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) diff --git a/DB.py b/DB.py index 0aae511..9ef4298 100644 --- a/DB.py +++ b/DB.py @@ -5,6 +5,67 @@ import time import functools from redis.exceptions import ConnectionError, TimeoutError +from sqlalchemy import ( + create_engine, MetaData, Table, Column, + BigInteger, Integer, String, Text +) +from sqlalchemy.dialects.mysql import insert as mysql_insert +from typing import List, Dict +import time, copy, queue + +MYSQL_URL = ( + "mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ" + "@192.144.230.75:3306/db_vidcon?charset=utf8mb4" +) + +_engine = create_engine( + MYSQL_URL, + pool_size=20, max_overflow=10, + pool_pre_ping=True, pool_recycle=3600, + future=True, +) +_meta = MetaData() + +video_op = Table( + "sh_dm_video_op_v2", _meta, + Column("v_id", BigInteger, primary_key=True), + Column("v_xid", String(64)), + Column("a_id", Integer), + Column("level", Integer), + Column("name_title", String(255)), + Column("keyword", String(255)), + Column("rn", String(8)), + Column("history_status", String(32)), + Column("is_repeat", Integer), + Column("sort", Integer), + Column("createtime", Integer), + Column("updatetime", Integer), + Column("batch", BigInteger), + Column("machine", Integer), +) + +video = Table( + "sh_dm_video_v2", _meta, + Column("v_id", BigInteger, primary_key=True), + Column("v_xid", String(64)), + Column("rn", String(8)), + Column("v_name", String(255)), + Column("title", String(255)), + Column("link", Text), + Column("edition", String(64)), + Column("duration", Integer), + Column("public_time", String(32)), + Column("cover_pic", Text), + Column("sort", Integer), + Column("u_xid", String(64)), + Column("u_id", BigInteger), + Column("u_pic", Text), + Column("u_name", String(255)), + Column("status", Integer), + Column("createtime", Integer), + Column("updatetime", Integer), +) + def mysql_retry(max_retries: int = 3, base_delay: float = 2.0): """ @@ -371,3 +432,101 @@ class DBVidcon: item = self.redis.lpop(self.error_list_key) # 如果你存入的是 JSON 字符串,可以在这里做一次反序列化: return json.loads(item) if item is not None else None + + +class DBSA: + BUFFER_SIZE = 1000 # 满 1000 条写库 + _buf_op: List[Dict] = [] + _buf_vid: List[Dict] = [] + + push_record = staticmethod(lambda d: print("[假装推 Redis]", d)) + + @classmethod + def upsert_video(cls, data: Dict): + data = copy.deepcopy(data) + data.setdefault("a_id", 0) + data.setdefault("history_status", "") + data.setdefault("is_repeat", 3) + data["sort"] = data.get("index", 0) + + op_row = { + "v_id": data["v_id"], + "v_xid": data["v_xid"], + "a_id": data["a_id"], + "level": data["level"], + "name_title": data["v_name"], + "keyword": data["keyword"], + "rn": data["rn"], + "history_status": data["history_status"], + "is_repeat": data["is_repeat"], + "sort": data["sort"], + "createtime": int(time.time()), + "updatetime": int(time.time()), + "batch": data["batch"], + "machine": data["machine_id"], + } + cls._buf_op.append(op_row) + + vid_row = { + "v_id": data["v_id"], + "v_xid": data["v_xid"], + "rn": data["rn"], + "v_name": data["v_name"], + "title": data["title"], + "link": data["link"], + "edition": "", + "duration": data["duration"], + "public_time": data["create_time"], + "cover_pic": data["cover_pic"], + "sort": data["sort"], + "u_xid": data["u_xid"], + "u_id": data["u_id"], + "u_pic": data["u_pic"], + "u_name": data["u_name"], + "status": 1, + "createtime": int(time.time()), + "updatetime": int(time.time()), + } + cls._buf_vid.append(vid_row) + + if len(cls._buf_op) >= cls.BUFFER_SIZE: + cls.flush() + + @classmethod + def _bulk_insert_op(cls): + if not cls._buf_op: + return + stmt = video_op.insert().values(cls._buf_op) + with _engine.begin() as conn: + conn.execute(stmt) + cls._buf_op.clear() + + @classmethod + def _bulk_upsert_vid(cls): + if not cls._buf_vid: + return + stmt = mysql_insert(video).values(cls._buf_vid) + upd = { + "title": stmt.inserted.title, + "duration": stmt.inserted.duration, + "cover_pic": stmt.inserted.cover_pic, + "sort": stmt.inserted.sort, + "updatetime": stmt.inserted.updatetime, + } + ondup = stmt.on_duplicate_key_update(**upd) + with _engine.begin() as conn: + conn.execute(ondup) + cls._buf_vid.clear() + + @classmethod + def flush(cls): + try: + cls._bulk_insert_op() + cls._bulk_upsert_vid() + except Exception as e: + print("[DBSA] 批写失败:", e) + # 将两段缓冲重推 Redis(与旧逻辑一致) + for row in cls._buf_vid: + cls.push_record(row) + cls._buf_op.clear() + cls._buf_vid.clear()