diff --git a/DB.py b/DB.py index 0aae511..9ef4298 100644 --- a/DB.py +++ b/DB.py @@ -5,6 +5,67 @@ import time import functools from redis.exceptions import ConnectionError, TimeoutError +from sqlalchemy import ( + create_engine, MetaData, Table, Column, + BigInteger, Integer, String, Text +) +from sqlalchemy.dialects.mysql import insert as mysql_insert +from typing import List, Dict +import time, copy, queue + +MYSQL_URL = ( + "mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ" + "@192.144.230.75:3306/db_vidcon?charset=utf8mb4" +) + +_engine = create_engine( + MYSQL_URL, + pool_size=20, max_overflow=10, + pool_pre_ping=True, pool_recycle=3600, + future=True, +) +_meta = MetaData() + +video_op = Table( + "sh_dm_video_op_v2", _meta, + Column("v_id", BigInteger, primary_key=True), + Column("v_xid", String(64)), + Column("a_id", Integer), + Column("level", Integer), + Column("name_title", String(255)), + Column("keyword", String(255)), + Column("rn", String(8)), + Column("history_status", String(32)), + Column("is_repeat", Integer), + Column("sort", Integer), + Column("createtime", Integer), + Column("updatetime", Integer), + Column("batch", BigInteger), + Column("machine", Integer), +) + +video = Table( + "sh_dm_video_v2", _meta, + Column("v_id", BigInteger, primary_key=True), + Column("v_xid", String(64)), + Column("rn", String(8)), + Column("v_name", String(255)), + Column("title", String(255)), + Column("link", Text), + Column("edition", String(64)), + Column("duration", Integer), + Column("public_time", String(32)), + Column("cover_pic", Text), + Column("sort", Integer), + Column("u_xid", String(64)), + Column("u_id", BigInteger), + Column("u_pic", Text), + Column("u_name", String(255)), + Column("status", Integer), + Column("createtime", Integer), + Column("updatetime", Integer), +) + def mysql_retry(max_retries: int = 3, base_delay: float = 2.0): """ @@ -371,3 +432,101 @@ class DBVidcon: item = self.redis.lpop(self.error_list_key) # 如果你存入的是 JSON 字符串,可以在这里做一次反序列化: return json.loads(item) if item is not None else None + + +class DBSA: + BUFFER_SIZE = 1000 # 满 1000 条写库 + _buf_op: List[Dict] = [] + _buf_vid: List[Dict] = [] + + push_record = staticmethod(lambda d: print("[假装推 Redis]", d)) + + @classmethod + def upsert_video(cls, data: Dict): + data = copy.deepcopy(data) + data.setdefault("a_id", 0) + data.setdefault("history_status", "") + data.setdefault("is_repeat", 3) + data["sort"] = data.get("index", 0) + + op_row = { + "v_id": data["v_id"], + "v_xid": data["v_xid"], + "a_id": data["a_id"], + "level": data["level"], + "name_title": data["v_name"], + "keyword": data["keyword"], + "rn": data["rn"], + "history_status": data["history_status"], + "is_repeat": data["is_repeat"], + "sort": data["sort"], + "createtime": int(time.time()), + "updatetime": int(time.time()), + "batch": data["batch"], + "machine": data["machine_id"], + } + cls._buf_op.append(op_row) + + vid_row = { + "v_id": data["v_id"], + "v_xid": data["v_xid"], + "rn": data["rn"], + "v_name": data["v_name"], + "title": data["title"], + "link": data["link"], + "edition": "", + "duration": data["duration"], + "public_time": data["create_time"], + "cover_pic": data["cover_pic"], + "sort": data["sort"], + "u_xid": data["u_xid"], + "u_id": data["u_id"], + "u_pic": data["u_pic"], + "u_name": data["u_name"], + "status": 1, + "createtime": int(time.time()), + "updatetime": int(time.time()), + } + cls._buf_vid.append(vid_row) + + if len(cls._buf_op) >= cls.BUFFER_SIZE: + cls.flush() + + @classmethod + def _bulk_insert_op(cls): + if not cls._buf_op: + return + stmt = video_op.insert().values(cls._buf_op) + with _engine.begin() as conn: + conn.execute(stmt) + cls._buf_op.clear() + + @classmethod + def _bulk_upsert_vid(cls): + if not cls._buf_vid: + return + stmt = mysql_insert(video).values(cls._buf_vid) + upd = { + "title": stmt.inserted.title, + "duration": stmt.inserted.duration, + "cover_pic": stmt.inserted.cover_pic, + "sort": stmt.inserted.sort, + "updatetime": stmt.inserted.updatetime, + } + ondup = stmt.on_duplicate_key_update(**upd) + with _engine.begin() as conn: + conn.execute(ondup) + cls._buf_vid.clear() + + @classmethod + def flush(cls): + try: + cls._bulk_insert_op() + cls._bulk_upsert_vid() + except Exception as e: + print("[DBSA] 批写失败:", e) + # 将两段缓冲重推 Redis(与旧逻辑一致) + for row in cls._buf_vid: + cls.push_record(row) + cls._buf_op.clear() + cls._buf_vid.clear()