feat: 优化视频数据处理逻辑,调整缓冲区大小和刷新策略,增强并发支持

This commit is contained in:
晓丰 2025-05-21 00:55:41 +08:00
parent bcc4751328
commit 1c4823d633
2 changed files with 61 additions and 63 deletions

109
DB.py
View File

@ -4,14 +4,13 @@ import pymysql
import time import time
import functools import functools
from redis.exceptions import ConnectionError, TimeoutError from redis.exceptions import ConnectionError, TimeoutError
import time, copy, threading
from typing import List, Dict
from sqlalchemy import ( from sqlalchemy import (
create_engine, MetaData, Table, Column, create_engine, MetaData, Table, Column,
BigInteger, Integer, String, Text BigInteger, Integer, String, Text
) )
from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.dialects.mysql import insert as mysql_insert
from typing import List, Dict
import time, copy, queue
MYSQL_URL = ( MYSQL_URL = (
"mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ" "mysql+pymysql://db_vidcon:rexdK4fhCCiRE4BZ"
@ -435,11 +434,15 @@ class DBVidcon:
class DBSA: class DBSA:
BUFFER_SIZE = 1000 # 满 1000 条写库 FLUSH_EVERY_ROWS = 100 # 行阈值
FLUSH_INTERVAL = 3 # 秒阈值
_buf_op: List[Dict] = [] _buf_op: List[Dict] = []
_buf_vid: List[Dict] = [] _buf_vid: List[Dict] = []
_last_flush: float = time.time()
_lock = threading.Lock()
push_record = staticmethod(lambda d: print("[假装推 Redis]", d)) push_record = staticmethod(lambda row: print("[退回Redis]", row["v_xid"]))
@classmethod @classmethod
def upsert_video(cls, data: Dict): def upsert_video(cls, data: Dict):
@ -449,63 +452,52 @@ class DBSA:
data.setdefault("is_repeat", 3) data.setdefault("is_repeat", 3)
data["sort"] = data.get("index", 0) data["sort"] = data.get("index", 0)
now_ts = int(time.time())
op_row = { op_row = {
"v_id": data["v_id"], "v_id": data["v_id"], "v_xid": data["v_xid"], "a_id": data["a_id"],
"v_xid": data["v_xid"], "level": data["level"], "name_title": data["v_name"],
"a_id": data["a_id"], "keyword": data["keyword"], "rn": data["rn"],
"level": data["level"], "history_status": data["history_status"], "is_repeat": data["is_repeat"],
"name_title": data["v_name"], "sort": data["sort"], "createtime": now_ts, "updatetime": now_ts,
"keyword": data["keyword"], "batch": data["batch"], "machine": data["machine_id"],
"rn": data["rn"],
"history_status": data["history_status"],
"is_repeat": data["is_repeat"],
"sort": data["sort"],
"createtime": int(time.time()),
"updatetime": int(time.time()),
"batch": data["batch"],
"machine": data["machine_id"],
} }
cls._buf_op.append(op_row)
vid_row = { vid_row = {
"v_id": data["v_id"], "v_id": data["v_id"], "v_xid": data["v_xid"], "rn": data["rn"],
"v_xid": data["v_xid"], "v_name": data["v_name"], "title": data["title"], "link": data["link"],
"rn": data["rn"], "edition": "", "duration": data["duration"],
"v_name": data["v_name"], "public_time": data["create_time"], "cover_pic": data["cover_pic"],
"title": data["title"], "sort": data["sort"], "u_xid": data["u_xid"], "u_id": data["u_id"],
"link": data["link"], "u_pic": data["u_pic"], "u_name": data["u_name"],
"edition": "", "status": 1, "createtime": now_ts, "updatetime": now_ts,
"duration": data["duration"], "is_repeat": data["is_repeat"],
"public_time": data["create_time"],
"cover_pic": data["cover_pic"],
"sort": data["sort"],
"u_xid": data["u_xid"],
"u_id": data["u_id"],
"u_pic": data["u_pic"],
"u_name": data["u_name"],
"status": 1,
"createtime": int(time.time()),
"updatetime": int(time.time()),
} }
need_flush = False
with cls._lock:
cls._buf_op.append(op_row)
cls._buf_vid.append(vid_row) cls._buf_vid.append(vid_row)
if len(cls._buf_op) >= cls.BUFFER_SIZE: if len(cls._buf_op) >= cls.FLUSH_EVERY_ROWS:
need_flush = True
elif time.time() - cls._last_flush >= cls.FLUSH_INTERVAL:
need_flush = True
if need_flush:
cls.flush() cls.flush()
@classmethod @classmethod
def _bulk_insert_op(cls): def _bulk_insert(cls, rows: List[Dict]):
if not cls._buf_op: if not rows:
return return
stmt = video_op.insert().values(cls._buf_op) stmt = video_op.insert().values(rows)
with _engine.begin() as conn: with _engine.begin() as conn:
conn.execute(stmt) conn.execute(stmt)
cls._buf_op.clear()
@classmethod @classmethod
def _bulk_upsert_vid(cls): def _bulk_upsert(cls, rows: List[Dict]):
if not cls._buf_vid: if not rows:
return return
stmt = mysql_insert(video).values(cls._buf_vid) stmt = mysql_insert(video).values(rows)
upd = { upd = {
"title": stmt.inserted.title, "title": stmt.inserted.title,
"duration": stmt.inserted.duration, "duration": stmt.inserted.duration,
@ -516,17 +508,22 @@ class DBSA:
ondup = stmt.on_duplicate_key_update(**upd) ondup = stmt.on_duplicate_key_update(**upd)
with _engine.begin() as conn: with _engine.begin() as conn:
conn.execute(ondup) conn.execute(ondup)
cls._buf_vid.clear()
@classmethod @classmethod
def flush(cls): def flush(cls):
try: with cls._lock:
cls._bulk_insert_op() op_rows, vid_rows = cls._buf_op[:], cls._buf_vid[:]
cls._bulk_upsert_vid()
except Exception as e:
print("[DBSA] 批写失败:", e)
# 将两段缓冲重推 Redis与旧逻辑一致
for row in cls._buf_vid:
cls.push_record(row)
cls._buf_op.clear() cls._buf_op.clear()
cls._buf_vid.clear() cls._buf_vid.clear()
cls._last_flush = time.time()
if not op_rows and not vid_rows:
return
try:
cls._bulk_insert(op_rows)
cls._bulk_upsert(vid_rows)
except Exception as e:
print("[DBSA] 批写失败:", e)
for row in vid_rows:
cls.push_record(row)

13
main.py
View File

@ -9,15 +9,17 @@ import concurrent.futures
import requests import requests
import datetime import datetime
from requests import RequestException from requests import RequestException
from DB import DBVidcon from DB import DBVidcon, DBSA
from dateutil import parser as date_parser from dateutil import parser as date_parser
import copy import copy
from threading import Lock from threading import Lock
from concurrent.futures import ThreadPoolExecutor, as_completed
db = DBVidcon() db = DBVidcon()
MACHINE_ID = None MACHINE_ID = None
MAX_WORKERS = 10 MAX_WORKERS = 10
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
def get_part_ids(part_num: int, take: int, offset: int = 0): def get_part_ids(part_num: int, take: int, offset: int = 0):
part_ids = list(range(offset, offset + take)) part_ids = list(range(offset, offset + take))
@ -691,7 +693,7 @@ def integrate_data_parallel():
# —— 写库:可按你原来的 upsert / flush 逻辑 —— # —— 写库:可按你原来的 upsert / flush 逻辑 ——
for item in v_list: for item in v_list:
record = { DBSA.upsert_video({
"keyword": kitem["keyword"], "keyword": kitem["keyword"],
"v_name": kitem["v_name"], "v_name": kitem["v_name"],
"v_id": item["v_id"], "v_id": item["v_id"],
@ -713,9 +715,8 @@ def integrate_data_parallel():
"batch": kitem["batch"], "batch": kitem["batch"],
"machine_id": MACHINE_ID, "machine_id": MACHINE_ID,
"level": kitem["level"], "level": kitem["level"],
} })
db.upsert_video(record) DBSA.flush()
db.flush()
if rollback[0]: if rollback[0]:
db.rollback_l0(rollback[0]) db.rollback_l0(rollback[0])
if rollback[1]: if rollback[1]: