diff --git a/DB.py b/DB.py index 76cec69..78b9266 100644 --- a/DB.py +++ b/DB.py @@ -671,6 +671,13 @@ class DBVidcon: # 如果你存入的是 JSON 字符串,可以在这里做一次反序列化: return json.loads(item) if item is not None else None +import time +import threading +import copy +from datetime import datetime +from sqlalchemy.dialects.mysql import insert as mysql_insert +from app import _engine, logger +from app.models import video, video_op, video_author class DBSA: FLUSH_EVERY_ROWS = 100 @@ -681,6 +688,8 @@ class DBSA: _buf_payload = [] _last_flush = time.time() _lock = threading.Lock() + _existing_op_keys = set() + _existing_vid_keys = set() @staticmethod def push_record_many(rows): @@ -690,66 +699,85 @@ class DBSA: def upsert_video(cls, data): data = copy.deepcopy(data) data.setdefault("a_id", 0) - data.setdefault("history_status", "") + data.setdefault("is_repeat", 3) data.setdefault("keyword", "") data["sort"] = data.get("index", 0) now_ts = int(time.time()) - op_row = { - "v_id": data["v_id"], - "v_xid": data["v_xid"], - "a_id": data["a_id"], - "level": data["level"], - "name_title": data["v_name"], - "keyword": data["keyword"], - "rn": data["rn"], - "history_status": data["history_status"], - "is_repeat": 3, - "sort": data["sort"], - "createtime": now_ts, - "updatetime": now_ts, - "operatetime": now_ts, - "batch": data["batch"], - "machine": data.get("machine_id", 0), - "is_piracy": data.get("is_piracy", '3'), - "ts_status": 1, - } - - vid_row = { - "v_id": data["v_id"], - "v_xid": data["v_xid"], - "v_name": data["v_name"], - "title": data["title"], - "link": data["link"], - "edition": "", - "duration": str(data["duration"]) if data.get("duration") else '0', - "public_time": data["create_time"], - "cover_pic": data["cover_pic"], - "sort": data["sort"], - "history_status": data.get("history_status", ""), - "u_xid": data["u_xid"], - "u_id": data["u_id"], - "u_pic": data["u_pic"], - "u_name": data["u_name"], - "status": 1, - "createtime": now_ts, - "updatetime": now_ts, - "operatetime": now_ts, - "watch_number": data.get("view", 0), - "follow_number": data.get("fans", 0), - "video_number": data.get("videos", 0), - "ts_status": 1, - "is_repeat": 3, - } + op_index_key = ( + data["v_xid"] or "", + data["keyword"] or "", + now_ts + ) + vid_index_key = ( + data["v_xid"] or "", + data["title"] or "" + ) with cls._lock: + if op_index_key in cls._existing_op_keys: + logger.debug(f"跳过重复操作记录: {op_index_key}") + return + + if vid_index_key in cls._existing_vid_keys: + logger.debug(f"跳过重复视频记录: {vid_index_key}") + return + + # 操作记录表 + op_row = { + "v_id": data["v_id"], + "v_xid": data["v_xid"], + "a_id": data["a_id"], + "level": data["level"], + "name_title": data["title"], + "keyword": data["keyword"], + "is_repeat": data["is_repeat"], + "sort": data["sort"], + "createtime": now_ts, + "updatetime": now_ts, + "operatetime": now_ts, + "batch": data["batch"], + "machine": data.get("machine_id", 0), + "is_piracy": data.get("is_piracy", '3'), + "ts_status": data.get("ts_status", 1), + } + + # 视频表 + vid_row = { + "v_id": data["v_id"], + "v_xid": data["v_xid"], + "title": data["title"], + "link": data["link"], + "edition": "", + "duration": str(data["duration"]) if data.get("duration") else '0', + "public_time": data["create_time"], + "cover_pic": data["cover_pic"], + "sort": data["sort"], + "u_xid": data["u_xid"], + "u_id": data["u_id"], + "u_pic": data["u_pic"], + "u_name": data["u_name"], + "status": 1, + "createtime": now_ts, + "updatetime": now_ts, + "operatetime": now_ts, + "watch_number": data.get("view", 0), + "follow_number": data.get("fans", 0), + "video_number": data.get("videos", 0), + "is_repeat": 3, + "is_piracy": data.get("is_piracy", 3), + "ts_status": data.get("ts_status", 1), + } + cls._buf_op.append(op_row) cls._buf_vid.append(vid_row) cls._buf_payload.append(data) + cls._existing_op_keys.add(op_index_key) + cls._existing_vid_keys.add(vid_index_key) if len(cls._buf_vid) >= cls.FLUSH_EVERY_ROWS or time.time() - cls._last_flush >= cls.FLUSH_INTERVAL: - logger.info("DBSA 落盘 (ROWS/TIME) ...") + logger.info("落表:达到行数或超时阈值,开始落库") cls.flush() @classmethod @@ -762,17 +790,15 @@ class DBSA: cls._buf_vid.clear() cls._buf_payload.clear() cls._last_flush = time.time() + cls._existing_op_keys.clear() + cls._existing_vid_keys.clear() if not op_rows and not vid_rows: return - logger.info("DBSA 开始 flush ...") - logger.debug(f"flush 操作记录数: {len(op_rows)}") - logger.debug(f"flush 视频记录数: {len(vid_rows)}") - - # 作者表处理 - now_ts = int(time.time()) + # ====== 写作者表 ====== authors_map = {} + now_ts = int(time.time()) for data in payloads: u_xid = data.get("u_xid") if not u_xid: @@ -791,77 +817,76 @@ class DBSA: } author_rows = list(authors_map.values()) - logger.debug(f"flush 作者记录数: {len(author_rows)}") - if author_rows: - stmt = mysql_insert(video_author).values(author_rows) - upd = { - "u_name": stmt.inserted.u_name, - "u_pic": stmt.inserted.u_pic, - "follow_number": stmt.inserted.follow_number, - "v_number": stmt.inserted.v_number, - "pv_number": stmt.inserted.pv_number, - "b_number": stmt.inserted.b_number, - "update_time": stmt.inserted.update_time, + stmt_author = mysql_insert(video_author).values(author_rows) + upd_author = { + "u_name": stmt_author.inserted.u_name, + "u_pic": stmt_author.inserted.u_pic, + "follow_number": stmt_author.inserted.follow_number, + "v_number": stmt_author.inserted.v_number, + "pv_number": stmt_author.inserted.pv_number, + "b_number": stmt_author.inserted.b_number, + "update_time": stmt_author.inserted.update_time, } - ondup = stmt.on_duplicate_key_update(**upd) + ondup_author = stmt_author.on_duplicate_key_update(**upd_author) try: - cls._execute_with_deadlock_retry(ondup) + cls._execute_with_deadlock_retry(ondup_author) except Exception as e: logger.error(f"写作者表失败: {e}") - try: - cls.push_record_many(payloads) - except Exception as re: - logger.error("[Redis 回退失败]", re) + cls.push_record_many(payloads) return - # 操作记录 UPSERT(解决唯一索引冲突) - try: - stmt = mysql_insert(video_op).values(op_rows) - ondup = stmt.on_duplicate_key_update( - updatetime=stmt.inserted.updatetime, - operatetime=stmt.inserted.operatetime, - ts_status=stmt.inserted.ts_status, - is_repeat=stmt.inserted.is_repeat, - history_status=stmt.inserted.history_status, - ) - with _engine.begin() as conn: - conn.execute(ondup) - logger.info(f"插入/更新操作记录: {len(op_rows)} 条") - except Exception as e: - logger.error(f"操作记录 UPSERT 失败: {e}") + # ====== 写操作记录表(UPSERT)====== + if op_rows: try: + stmt = mysql_insert(video_op).values(op_rows) + ondup = stmt.on_duplicate_key_update( + updatetime=stmt.inserted.updatetime, + operatetime=stmt.inserted.operatetime, + ts_status=stmt.inserted.ts_status, + is_repeat=stmt.inserted.is_repeat, + ) + with _engine.begin() as conn: + conn.execute(ondup) + logger.info(f"落表:操作记录 {len(op_rows)} 条") + except Exception as e: + logger.error(f"写操作记录表失败: {e}") cls.push_record_many(payloads) - except Exception as re: - logger.error("[Redis 回退失败]", re) - return + return - # 视频表 UPSERT - try: - stmt = mysql_insert(video).values(vid_rows) - upd = { - "title": stmt.inserted.title, - "duration": stmt.inserted.duration, - "cover_pic": stmt.inserted.cover_pic, - "sort": stmt.inserted.sort, - "updatetime": stmt.inserted.updatetime, - "watch_number": stmt.inserted.watch_number, - "follow_number": stmt.inserted.follow_number, - "video_number": stmt.inserted.video_number, - "is_repeat": stmt.inserted.is_repeat, - "ts_status": stmt.inserted.ts_status, - } - ondup = stmt.on_duplicate_key_update(**upd) - with _engine.begin() as conn: - conn.execute(ondup) - logger.info( - f"[DBSA] flush 完成:authors={len(author_rows)} 条, op={len(op_rows)} 条, video={len(vid_rows)} 条") - except Exception as e: - logger.error(f"写视频表失败: {e}") + # ====== 写视频表(UPSERT)====== + if vid_rows: try: + stmt = mysql_insert(video).values(vid_rows) + upd = { + "title": stmt.inserted.title, + "link": stmt.inserted.link, + "is_piracy": stmt.inserted.is_piracy, + "edition": stmt.inserted.edition, + "duration": stmt.inserted.duration, + "watch_number": stmt.inserted.watch_number, + "follow_number": stmt.inserted.follow_number, + "video_number": stmt.inserted.video_number, + "public_time": stmt.inserted.public_time, + "cover_pic": stmt.inserted.cover_pic, + "sort": stmt.inserted.sort, + "u_xid": stmt.inserted.u_xid, + "u_id": stmt.inserted.u_id, + "u_pic": stmt.inserted.u_pic, + "u_name": stmt.inserted.u_name, + "status": stmt.inserted.status, + "ts_status": stmt.inserted.ts_status, + "updatetime": stmt.inserted.updatetime, + "operatetime": stmt.inserted.operatetime, + "is_repeat": stmt.inserted.is_repeat, + } + ondup = stmt.on_duplicate_key_update(**upd) + with _engine.begin() as conn: + conn.execute(ondup) + logger.info(f"落表:视频记录 {len(vid_rows)} 条") + except Exception as e: + logger.error(f"写视频表失败: {e}") cls.push_record_many(payloads) - except Exception as re: - logger.error("[Redis 回退失败]", re) @classmethod def _execute_with_deadlock_retry(cls, statement): @@ -870,9 +895,8 @@ class DBSA: with _engine.begin() as conn: conn.execute(statement) return - except OperationalError as e: - code = getattr(e.orig, "args", [None])[0] - if code == 1213 and attempt < 2: + except Exception as e: + if getattr(e.orig, "args", [None])[0] == 1213 and attempt < 2: time.sleep(0.5 * (attempt + 1)) continue - raise \ No newline at end of file + raise