diff --git a/DB.py b/DB.py index 9ef4298..565e407 100644 --- a/DB.py +++ b/DB.py @@ -4,14 +4,13 @@ import pymysql import time import functools from redis.exceptions import ConnectionError, TimeoutError - +import time, copy, threading +from typing import List, Dict from sqlalchemy import ( create_engine, MetaData, Table, Column, BigInteger, Integer, String, Text ) from sqlalchemy.dialects.mysql import insert as mysql_insert -from typing import List, Dict -import time, copy, queue MYSQL_URL = ( "mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ" @@ -435,11 +434,15 @@ class DBVidcon: class DBSA: - BUFFER_SIZE = 1000 # 满 1000 条写库 + FLUSH_EVERY_ROWS = 100 # 行阈值 + FLUSH_INTERVAL = 3 # 秒阈值 + _buf_op: List[Dict] = [] _buf_vid: List[Dict] = [] + _last_flush: float = time.time() + _lock = threading.Lock() - push_record = staticmethod(lambda d: print("[假装推 Redis]", d)) + push_record = staticmethod(lambda row: print("[退回Redis]", row["v_xid"])) @classmethod def upsert_video(cls, data: Dict): @@ -449,63 +452,52 @@ class DBSA: data.setdefault("is_repeat", 3) data["sort"] = data.get("index", 0) + now_ts = int(time.time()) op_row = { - "v_id": data["v_id"], - "v_xid": data["v_xid"], - "a_id": data["a_id"], - "level": data["level"], - "name_title": data["v_name"], - "keyword": data["keyword"], - "rn": data["rn"], - "history_status": data["history_status"], - "is_repeat": data["is_repeat"], - "sort": data["sort"], - "createtime": int(time.time()), - "updatetime": int(time.time()), - "batch": data["batch"], - "machine": data["machine_id"], + "v_id": data["v_id"], "v_xid": data["v_xid"], "a_id": data["a_id"], + "level": data["level"], "name_title": data["v_name"], + "keyword": data["keyword"], "rn": data["rn"], + "history_status": data["history_status"], "is_repeat": data["is_repeat"], + "sort": data["sort"], "createtime": now_ts, "updatetime": now_ts, + "batch": data["batch"], "machine": data["machine_id"], } - cls._buf_op.append(op_row) - vid_row = { - "v_id": data["v_id"], - "v_xid": data["v_xid"], - "rn": data["rn"], - "v_name": data["v_name"], - "title": data["title"], - "link": data["link"], - "edition": "", - "duration": data["duration"], - "public_time": data["create_time"], - "cover_pic": data["cover_pic"], - "sort": data["sort"], - "u_xid": data["u_xid"], - "u_id": data["u_id"], - "u_pic": data["u_pic"], - "u_name": data["u_name"], - "status": 1, - "createtime": int(time.time()), - "updatetime": int(time.time()), + "v_id": data["v_id"], "v_xid": data["v_xid"], "rn": data["rn"], + "v_name": data["v_name"], "title": data["title"], "link": data["link"], + "edition": "", "duration": data["duration"], + "public_time": data["create_time"], "cover_pic": data["cover_pic"], + "sort": data["sort"], "u_xid": data["u_xid"], "u_id": data["u_id"], + "u_pic": data["u_pic"], "u_name": data["u_name"], + "status": 1, "createtime": now_ts, "updatetime": now_ts, + "is_repeat": data["is_repeat"], } - cls._buf_vid.append(vid_row) - if len(cls._buf_op) >= cls.BUFFER_SIZE: + need_flush = False + with cls._lock: + cls._buf_op.append(op_row) + cls._buf_vid.append(vid_row) + + if len(cls._buf_op) >= cls.FLUSH_EVERY_ROWS: + need_flush = True + elif time.time() - cls._last_flush >= cls.FLUSH_INTERVAL: + need_flush = True + + if need_flush: cls.flush() @classmethod - def _bulk_insert_op(cls): - if not cls._buf_op: + def _bulk_insert(cls, rows: List[Dict]): + if not rows: return - stmt = video_op.insert().values(cls._buf_op) + stmt = video_op.insert().values(rows) with _engine.begin() as conn: conn.execute(stmt) - cls._buf_op.clear() @classmethod - def _bulk_upsert_vid(cls): - if not cls._buf_vid: + def _bulk_upsert(cls, rows: List[Dict]): + if not rows: return - stmt = mysql_insert(video).values(cls._buf_vid) + stmt = mysql_insert(video).values(rows) upd = { "title": stmt.inserted.title, "duration": stmt.inserted.duration, @@ -516,17 +508,22 @@ class DBSA: ondup = stmt.on_duplicate_key_update(**upd) with _engine.begin() as conn: conn.execute(ondup) - cls._buf_vid.clear() @classmethod def flush(cls): - try: - cls._bulk_insert_op() - cls._bulk_upsert_vid() - except Exception as e: - print("[DBSA] 批写失败:", e) - # 将两段缓冲重推 Redis(与旧逻辑一致) - for row in cls._buf_vid: - cls.push_record(row) + with cls._lock: + op_rows, vid_rows = cls._buf_op[:], cls._buf_vid[:] cls._buf_op.clear() cls._buf_vid.clear() + cls._last_flush = time.time() + + if not op_rows and not vid_rows: + return + + try: + cls._bulk_insert(op_rows) + cls._bulk_upsert(vid_rows) + except Exception as e: + print("[DBSA] 批写失败:", e) + for row in vid_rows: + cls.push_record(row) diff --git a/main.py b/main.py index 84b88c5..af1046f 100644 --- a/main.py +++ b/main.py @@ -9,15 +9,17 @@ import concurrent.futures import requests import datetime from requests import RequestException -from DB import DBVidcon +from DB import DBVidcon, DBSA from dateutil import parser as date_parser import copy from threading import Lock +from concurrent.futures import ThreadPoolExecutor, as_completed + db = DBVidcon() MACHINE_ID = None MAX_WORKERS = 10 - +executor = ThreadPoolExecutor(max_workers=MAX_WORKERS) def get_part_ids(part_num: int, take: int, offset: int = 0): part_ids = list(range(offset, offset + take)) @@ -691,7 +693,7 @@ def integrate_data_parallel(): # —— 写库:可按你原来的 upsert / flush 逻辑 —— for item in v_list: - record = { + DBSA.upsert_video({ "keyword": kitem["keyword"], "v_name": kitem["v_name"], "v_id": item["v_id"], @@ -713,9 +715,8 @@ def integrate_data_parallel(): "batch": kitem["batch"], "machine_id": MACHINE_ID, "level": kitem["level"], - } - db.upsert_video(record) - db.flush() + }) + DBSA.flush() if rollback[0]: db.rollback_l0(rollback[0]) if rollback[1]: