feat: 优化数据库操作逻辑,添加唯一索引检查以避免重复记录
This commit is contained in:
parent
b6ffe17404
commit
4141837764
307
DB.py
307
DB.py
@ -658,51 +658,112 @@ class DBSA:
|
|||||||
FLUSH_EVERY_ROWS = 100 # 行阈值
|
FLUSH_EVERY_ROWS = 100 # 行阈值
|
||||||
FLUSH_INTERVAL = 30 # 秒阈值
|
FLUSH_INTERVAL = 30 # 秒阈值
|
||||||
|
|
||||||
_buf_op: List[Dict] = []
|
_buf_op: list = []
|
||||||
_buf_vid: List[Dict] = []
|
_buf_vid: list = []
|
||||||
_buf_payload: List[Dict] = []
|
_buf_payload: list = []
|
||||||
_last_flush: float = time.time()
|
_last_flush: float = time.time()
|
||||||
_lock = threading.Lock()
|
_lock = threading.Lock()
|
||||||
|
_existing_op_keys = set() # 用于操作记录表的索引键
|
||||||
|
_existing_vid_keys = set() # 用于视频表的唯一索引键
|
||||||
|
|
||||||
push_record_many = staticmethod(
|
@staticmethod
|
||||||
lambda rows: logger.info("[退回Redis] cnt=", len(rows))
|
def push_record_many(rows):
|
||||||
)
|
"""退回Redis的模拟方法"""
|
||||||
|
logger.info(f"[退回Redis] cnt={len(rows)}")
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def upsert_video(cls, data):
|
def upsert_video(cls, data):
|
||||||
data = copy.deepcopy(data)
|
data = copy.deepcopy(data)
|
||||||
|
# 设置默认值
|
||||||
data.setdefault("a_id", 0)
|
data.setdefault("a_id", 0)
|
||||||
data.setdefault("history_status", "")
|
data.setdefault("history_status", "")
|
||||||
data.setdefault("is_repeat", 3) # 避免 KeyError
|
data.setdefault("is_repeat", 3)
|
||||||
|
data.setdefault("keyword", "")
|
||||||
data["sort"] = data.get("index", 0)
|
data["sort"] = data.get("index", 0)
|
||||||
|
|
||||||
now_ts = int(time.time())
|
now_ts = int(time.time())
|
||||||
op_row = {
|
|
||||||
"v_id": data["v_id"], "v_xid": data["v_xid"], "a_id": data["a_id"],
|
# 生成操作记录表的唯一索引键
|
||||||
"level": data["level"], "name_title": data["v_name"],
|
op_index_key = (
|
||||||
"keyword": data["keyword"], "rn": data["rn"],
|
data["rn"] or "",
|
||||||
"history_status": data["history_status"], "is_repeat": data["is_repeat"],
|
data["v_name"] or "",
|
||||||
"sort": data["sort"], "createtime": now_ts, "updatetime": now_ts,
|
data["keyword"] or "",
|
||||||
"batch": data["batch"], "machine": data["machine_id"],
|
data["v_xid"] or "",
|
||||||
}
|
now_ts
|
||||||
vid_row = {
|
)
|
||||||
"v_id": data["v_id"], "v_xid": data["v_xid"], "rn": data["rn"],
|
|
||||||
"v_name": data["v_name"], "title": data["title"], "link": data["link"],
|
# 生成视频表的唯一索引键
|
||||||
"edition": "", "duration": data["duration"],
|
vid_index_key = (
|
||||||
"public_time": data["create_time"], "cover_pic": data["cover_pic"],
|
data["v_xid"] or "",
|
||||||
"sort": data["sort"], "u_xid": data["u_xid"], "u_id": data["u_id"],
|
data["v_name"] or ""
|
||||||
"u_pic": data["u_pic"], "u_name": data["u_name"],
|
)
|
||||||
"status": 1, "createtime": now_ts, "updatetime": now_ts, "watch_number": data.get("view", 0),
|
|
||||||
"follow_number": data.get("fans", 0),
|
|
||||||
"video_number": data.get("videos", 0),
|
|
||||||
}
|
|
||||||
|
|
||||||
with cls._lock:
|
with cls._lock:
|
||||||
|
# 检查两个索引键是否已存在
|
||||||
|
if op_index_key in cls._existing_op_keys:
|
||||||
|
logger.debug(f"跳过重复操作记录: {op_index_key}")
|
||||||
|
return
|
||||||
|
|
||||||
|
if vid_index_key in cls._existing_vid_keys:
|
||||||
|
logger.debug(f"跳过重复视频记录: {vid_index_key}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 生成操作记录行
|
||||||
|
op_row = {
|
||||||
|
"v_id": data["v_id"],
|
||||||
|
"v_xid": data["v_xid"],
|
||||||
|
"a_id": data["a_id"],
|
||||||
|
"level": data["level"],
|
||||||
|
"name_title": data["v_name"],
|
||||||
|
"keyword": data["keyword"],
|
||||||
|
"rn": data["rn"],
|
||||||
|
"history_status": data["history_status"],
|
||||||
|
"is_repeat": data["is_repeat"],
|
||||||
|
"sort": data["sort"],
|
||||||
|
"createtime": now_ts,
|
||||||
|
"updatetime": now_ts,
|
||||||
|
"operatetime": now_ts,
|
||||||
|
"batch": data["batch"],
|
||||||
|
"machine": data.get("machine_id", 0),
|
||||||
|
"is_piracy": data.get("is_piracy", '3'),
|
||||||
|
"ts_status": data.get("ts_status", 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
# 视频行生成
|
||||||
|
vid_row = {
|
||||||
|
"v_id": data["v_id"],
|
||||||
|
"v_xid": data["v_xid"],
|
||||||
|
"rn": data["rn"],
|
||||||
|
"v_name": data["v_name"],
|
||||||
|
"title": data["title"],
|
||||||
|
"link": data["link"],
|
||||||
|
"edition": "",
|
||||||
|
"duration": str(data["duration"]) if data.get("duration") else '0',
|
||||||
|
"public_time": data["create_time"],
|
||||||
|
"cover_pic": data["cover_pic"],
|
||||||
|
"sort": data["sort"],
|
||||||
|
"u_xid": data["u_xid"],
|
||||||
|
"u_id": data["u_id"],
|
||||||
|
"u_pic": data["u_pic"],
|
||||||
|
"u_name": data["u_name"],
|
||||||
|
"status": 1,
|
||||||
|
"createtime": now_ts,
|
||||||
|
"updatetime": now_ts,
|
||||||
|
"operatetime": now_ts,
|
||||||
|
"watch_number": data.get("view", 0),
|
||||||
|
"follow_number": data.get("fans", 0),
|
||||||
|
"video_number": data.get("videos", 0),
|
||||||
|
"is_repeat": 0, # 初始设为0,flush时更新
|
||||||
|
"is_piracy": data.get("is_piracy", 3),
|
||||||
|
"ts_status": data.get("ts_status", 1),
|
||||||
|
}
|
||||||
|
|
||||||
cls._buf_op.append(op_row)
|
cls._buf_op.append(op_row)
|
||||||
cls._buf_vid.append(vid_row)
|
cls._buf_vid.append(vid_row)
|
||||||
cls._buf_payload.append(data) # 保存原始
|
cls._buf_payload.append(data)
|
||||||
|
cls._existing_op_keys.add(op_index_key)
|
||||||
|
cls._existing_vid_keys.add(vid_index_key)
|
||||||
buf_len = len(cls._buf_vid)
|
buf_len = len(cls._buf_vid)
|
||||||
# logger.info(f"DB缓冲 -> xid={data['v_xid']}, level={data['level']}, buffer={buf_len}")
|
|
||||||
|
|
||||||
need_flush = False
|
need_flush = False
|
||||||
flush_reason = ""
|
flush_reason = ""
|
||||||
@ -714,33 +775,9 @@ class DBSA:
|
|||||||
flush_reason = "TIME"
|
flush_reason = "TIME"
|
||||||
|
|
||||||
if need_flush:
|
if need_flush:
|
||||||
logger.info(f"DBSA 落 ({flush_reason}) ...")
|
logger.info(f"DBSA 落盘 ({flush_reason}) ...")
|
||||||
cls.flush()
|
cls.flush()
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _bulk_insert(cls, rows: List[Dict]):
|
|
||||||
if not rows:
|
|
||||||
return
|
|
||||||
stmt = video_op.insert().values(rows)
|
|
||||||
with _engine.begin() as conn:
|
|
||||||
conn.execute(stmt)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _bulk_upsert(cls, rows: List[Dict]):
|
|
||||||
if not rows:
|
|
||||||
return
|
|
||||||
stmt = mysql_insert(video).values(rows)
|
|
||||||
upd = {
|
|
||||||
"title": stmt.inserted.title,
|
|
||||||
"duration": stmt.inserted.duration,
|
|
||||||
"cover_pic": stmt.inserted.cover_pic,
|
|
||||||
"sort": stmt.inserted.sort,
|
|
||||||
"updatetime": stmt.inserted.updatetime,
|
|
||||||
}
|
|
||||||
ondup = stmt.on_duplicate_key_update(**upd)
|
|
||||||
with _engine.begin() as conn:
|
|
||||||
conn.execute(ondup)
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def flush(cls):
|
def flush(cls):
|
||||||
with cls._lock:
|
with cls._lock:
|
||||||
@ -751,42 +788,103 @@ class DBSA:
|
|||||||
cls._buf_vid.clear()
|
cls._buf_vid.clear()
|
||||||
cls._buf_payload.clear()
|
cls._buf_payload.clear()
|
||||||
cls._last_flush = time.time()
|
cls._last_flush = time.time()
|
||||||
|
existing_op_keys_batch = cls._existing_op_keys.copy()
|
||||||
|
existing_vid_keys_batch = cls._existing_vid_keys.copy()
|
||||||
|
cls._existing_op_keys.clear()
|
||||||
|
cls._existing_vid_keys.clear()
|
||||||
|
|
||||||
if not op_rows and not vid_rows:
|
if not op_rows and not vid_rows:
|
||||||
return
|
return
|
||||||
|
|
||||||
existing_keys = set()
|
# ========== 视频表重复检测 ==========
|
||||||
|
vid_existing_keys = set()
|
||||||
if vid_rows:
|
if vid_rows:
|
||||||
all_keys = list({(row["v_xid"], row["rn"]) for row in vid_rows})
|
# 生成所有视频唯一键 (v_xid, v_name)
|
||||||
|
all_vid_keys = list({(row["v_xid"], row["v_name"]) for row in vid_rows})
|
||||||
|
|
||||||
conn = _engine.connect()
|
conn = _engine.connect()
|
||||||
try:
|
try:
|
||||||
sel_vid = (
|
# 查询数据库中已存在的视频记录
|
||||||
video.select()
|
sel_vid = select([
|
||||||
.with_only_columns(video.c.v_xid, video.c.rn)
|
video.c.v_xid,
|
||||||
.where(tuple_(video.c.v_xid, video.c.rn).in_(all_keys))
|
video.c.v_name
|
||||||
|
]).where(
|
||||||
|
tuple_(video.c.v_xid, video.c.v_name).in_(all_vid_keys)
|
||||||
)
|
)
|
||||||
existing_keys = {(row.v_xid, row.rn) for row in conn.execute(sel_vid).fetchall()}
|
result = conn.execute(sel_vid)
|
||||||
|
vid_existing_keys = {(row.v_xid, row.v_name) for row in result}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.info(f"[DBSA] 查询 video 表时异常: {e}")
|
logger.error(f"查询视频表时异常: {e}")
|
||||||
try:
|
try:
|
||||||
cls.push_record_many(payloads)
|
cls.push_record_many(payloads)
|
||||||
except Exception as re:
|
except Exception as re:
|
||||||
logger.info("[Redis 回退失败]", re)
|
logger.error("[Redis 回退失败]", re)
|
||||||
return
|
return
|
||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
# 先给日志表的 op_rows 设置 0/1:1=重复,0=不重复
|
# 设置重复标志
|
||||||
for i, vid_row in enumerate(vid_rows):
|
for i, vid_row in enumerate(vid_rows):
|
||||||
key = (vid_row["v_xid"], vid_row["rn"])
|
key = (vid_row["v_xid"], vid_row["v_name"])
|
||||||
op_rows[i]["is_repeat"] = 1 if key in existing_keys else 0
|
vid_row["is_repeat"] = 1 if key in vid_existing_keys else 0
|
||||||
|
# 同时设置操作记录的重复标志
|
||||||
|
if i < len(op_rows):
|
||||||
|
op_rows[i]["is_repeat"] = vid_row["is_repeat"]
|
||||||
|
|
||||||
# 再把同样的 is_repeat 值写到 vid_rows,以便视频表也能存到 0/1
|
# ========== 操作记录表去重 ==========
|
||||||
for i, vid_row in enumerate(vid_rows):
|
final_op_rows = []
|
||||||
vid_row["is_repeat"] = op_rows[i]["is_repeat"]
|
if op_rows:
|
||||||
vid_row.pop("level", None)
|
try:
|
||||||
|
# 生成所有操作记录索引键
|
||||||
|
op_check_keys = [
|
||||||
|
(row["rn"], row["name_title"], row["keyword"], row["v_xid"], row["createtime"])
|
||||||
|
for row in op_rows
|
||||||
|
]
|
||||||
|
|
||||||
# 以下作者表、日志表和视频表写入逻辑保持不变...
|
# 查询数据库已存在的操作记录
|
||||||
|
conn = _engine.connect()
|
||||||
|
stmt = select([
|
||||||
|
video_op.c.rn,
|
||||||
|
video_op.c.name_title,
|
||||||
|
video_op.c.keyword,
|
||||||
|
video_op.c.v_xid,
|
||||||
|
video_op.c.createtime
|
||||||
|
]).where(
|
||||||
|
tuple_(
|
||||||
|
video_op.c.rn,
|
||||||
|
video_op.c.name_title,
|
||||||
|
video_op.c.keyword,
|
||||||
|
video_op.c.v_xid,
|
||||||
|
video_op.c.createtime
|
||||||
|
).in_(op_check_keys)
|
||||||
|
)
|
||||||
|
result = conn.execute(stmt)
|
||||||
|
existing_db_op_keys = {tuple(row) for row in result}
|
||||||
|
conn.close()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"查询操作记录表失败: {e}")
|
||||||
|
try:
|
||||||
|
cls.push_record_many(payloads)
|
||||||
|
except Exception as re:
|
||||||
|
logger.error("Redis退回失败", re)
|
||||||
|
return
|
||||||
|
|
||||||
|
# 过滤重复操作记录
|
||||||
|
for idx, row in enumerate(op_rows):
|
||||||
|
index_key = (
|
||||||
|
row["rn"],
|
||||||
|
row["name_title"],
|
||||||
|
row["keyword"],
|
||||||
|
row["v_xid"],
|
||||||
|
row["createtime"]
|
||||||
|
)
|
||||||
|
|
||||||
|
# 检查是否在数据库或当前批次中重复
|
||||||
|
if index_key not in existing_db_op_keys and index_key not in existing_op_keys_batch:
|
||||||
|
final_op_rows.append(row)
|
||||||
|
existing_op_keys_batch.add(index_key)
|
||||||
|
|
||||||
|
# ========== 作者表处理 ==========
|
||||||
authors_map = {}
|
authors_map = {}
|
||||||
now_ts = int(time.time())
|
now_ts = int(time.time())
|
||||||
for data in payloads:
|
for data in payloads:
|
||||||
@ -823,36 +921,69 @@ class DBSA:
|
|||||||
try:
|
try:
|
||||||
cls._execute_with_deadlock_retry(ondup_author)
|
cls._execute_with_deadlock_retry(ondup_author)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.info(f"[DBSA] 写作者表失败(死锁重试后仍未成功): {e}")
|
logger.error(f"写作者表失败: {e}")
|
||||||
try:
|
try:
|
||||||
cls.push_record_many(payloads)
|
cls.push_record_many(payloads)
|
||||||
except Exception as re:
|
except Exception as re:
|
||||||
logger.info("[Redis 回退失败]", re)
|
logger.error("[Redis 回退失败]", re)
|
||||||
return
|
return
|
||||||
|
|
||||||
if op_rows:
|
# ========== 操作记录表写入 ==========
|
||||||
|
if final_op_rows:
|
||||||
try:
|
try:
|
||||||
cls._bulk_insert(op_rows)
|
# 按索引字段排序优化插入性能
|
||||||
|
final_op_rows.sort(key=lambda x: (
|
||||||
|
x['rn'] or '',
|
||||||
|
x['name_title'] or '',
|
||||||
|
x['keyword'] or '',
|
||||||
|
x['v_xid'] or '',
|
||||||
|
x['createtime']
|
||||||
|
))
|
||||||
|
|
||||||
|
stmt = video_op.insert().values(final_op_rows)
|
||||||
|
with _engine.begin() as conn:
|
||||||
|
conn.execute(stmt)
|
||||||
|
logger.info(f"插入操作记录: {len(final_op_rows)} 条")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.info(f"[DBSA] 写日志表失败: {e}")
|
logger.error(f"插入操作记录失败: {e}")
|
||||||
try:
|
try:
|
||||||
cls.push_record_many(payloads)
|
cls.push_record_many(payloads)
|
||||||
except Exception as re:
|
except Exception as re:
|
||||||
logger.info("[Redis 回退失败]", re)
|
logger.error("[Redis 回退失败]", re)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# ========== 视频表写入 ==========
|
||||||
if vid_rows:
|
if vid_rows:
|
||||||
try:
|
try:
|
||||||
cls._bulk_upsert(vid_rows)
|
# 使用新的唯一键 (v_xid, v_name) 进行 upsert
|
||||||
|
stmt = mysql_insert(video).values(vid_rows)
|
||||||
|
upd = {
|
||||||
|
"title": stmt.inserted.title,
|
||||||
|
"duration": stmt.inserted.duration,
|
||||||
|
"cover_pic": stmt.inserted.cover_pic,
|
||||||
|
"sort": stmt.inserted.sort,
|
||||||
|
"updatetime": stmt.inserted.updatetime,
|
||||||
|
"watch_number": stmt.inserted.watch_number,
|
||||||
|
"follow_number": stmt.inserted.follow_number,
|
||||||
|
"video_number": stmt.inserted.video_number,
|
||||||
|
"is_repeat": stmt.inserted.is_repeat,
|
||||||
|
"is_piracy": stmt.inserted.is_piracy,
|
||||||
|
"ts_status": stmt.inserted.ts_status,
|
||||||
|
}
|
||||||
|
ondup = stmt.on_duplicate_key_update(**upd)
|
||||||
|
with _engine.begin() as conn:
|
||||||
|
conn.execute(ondup)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"[DBSA] flush 完成:authors={len(author_rows)} 条,op={len(op_rows)} 条,video={len(vid_rows)} 条"
|
f"[DBSA] flush 完成:authors={len(author_rows)} 条, "
|
||||||
|
f"op={len(final_op_rows)} 条, video={len(vid_rows)} 条"
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.info(f"[DBSA] 写视频表失败: {e} op={len(op_rows)} video={len(vid_rows)}")
|
logger.error(f"写视频表失败: {e}")
|
||||||
try:
|
try:
|
||||||
cls.push_record_many(payloads)
|
cls.push_record_many(payloads)
|
||||||
except Exception as re:
|
except Exception as re:
|
||||||
logger.info("[Redis 回退失败]", re)
|
logger.error("[Redis 回退失败]", re)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def update_video_stats(cls, locator: dict, stats: dict) -> int:
|
def update_video_stats(cls, locator: dict, stats: dict) -> int:
|
||||||
@ -880,7 +1011,7 @@ class DBSA:
|
|||||||
@classmethod
|
@classmethod
|
||||||
def update_video_stats_async(cls, locator: dict, stats: dict) -> None:
|
def update_video_stats_async(cls, locator: dict, stats: dict) -> None:
|
||||||
"""
|
"""
|
||||||
异步更新 sh_dm_video_v2 表中的统计字段,立即返回,不阻塞调用线程。
|
异步更新 sh_dm_video_v3 表中的统计字段,立即返回,不阻塞调用线程。
|
||||||
"""
|
"""
|
||||||
thread = threading.Thread(
|
thread = threading.Thread(
|
||||||
target=cls.update_video_stats,
|
target=cls.update_video_stats,
|
||||||
@ -893,17 +1024,13 @@ class DBSA:
|
|||||||
def _execute_with_deadlock_retry(cls, statement):
|
def _execute_with_deadlock_retry(cls, statement):
|
||||||
for attempt in range(3):
|
for attempt in range(3):
|
||||||
try:
|
try:
|
||||||
|
|
||||||
with _engine.begin() as conn:
|
with _engine.begin() as conn:
|
||||||
conn.execute(statement)
|
conn.execute(statement)
|
||||||
return
|
return
|
||||||
except OperationalError as e:
|
except OperationalError as e:
|
||||||
# e.orig.args[0] == 1213 表示死锁
|
# 检查是否为死锁错误
|
||||||
code = None
|
code = getattr(e.orig, "args", [None])[0]
|
||||||
if hasattr(e.orig, "args") and len(e.orig.args) >= 1:
|
if code == 1213 and attempt < 2: # 1213 是MySQL死锁错误码
|
||||||
code = e.orig.args[0]
|
time.sleep(0.5 * (attempt + 1))
|
||||||
if code == 1213 and attempt < 3 - 1:
|
|
||||||
time.sleep(1)
|
|
||||||
continue
|
continue
|
||||||
# 不是死锁,或者已经重试到达上限,直接抛出
|
raise
|
||||||
raise
|
|
@ -94,7 +94,7 @@ while True:
|
|||||||
)
|
)
|
||||||
db.flush()
|
db.flush()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.logger.error(f"ID:{li['id']}, e:{e}")
|
logger.error(f"ID:{li['id']}, e:{e}")
|
||||||
db.update_fight_record_status(li['id'], 0, 3, str(e), mid=MACHINE_ID)
|
db.update_fight_record_status(li['id'], 0, 3, str(e), mid=MACHINE_ID)
|
||||||
time.sleep(60) # 出错延迟
|
time.sleep(60) # 出错延迟
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user