import pandas as pd from datetime import datetime from sqlalchemy import create_engine, text DB_CONFIG = { "host": "192.144.230.75", "port": 3306, "user": "db_vidcon", "password": "rexdK4fhCCiRE4BZ", "database": "db_vidcon", "charset": "utf8mb4", } URL = ( f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}" f"@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}" f"?charset={DB_CONFIG['charset']}" ) engine = create_engine(URL, pool_pre_ping=True) # 在导出前执行更新 is_repeat 的 SQL def update_is_repeat(batches: list[int]): sql = text(""" UPDATE sh_dm_video_op_v2 AS op JOIN ( SELECT v_xid, COUNT(*) AS cnt FROM sh_dm_video_op_v2 WHERE batch IN :batches GROUP BY v_xid ) AS agg ON op.v_xid = agg.v_xid SET op.is_repeat = CASE WHEN agg.cnt = 1 THEN 1 ELSE 2 END WHERE op.batch IN :batches; """) with engine.begin() as conn: conn.execute(sql, {"batches": tuple(batches)}) print(f"已更新批次 {batches} 的 is_repeat 字段。") def get_rn_list() -> list[str]: sql = "SELECT DISTINCT rn FROM sh_dm_video_op_v2;" with engine.connect() as conn: result = conn.execute(text(sql)) return [row[0] for row in result] def fetch_all_data_for_rn(rn: str, batches: list[int]) -> pd.DataFrame: sql = text( """ SELECT op.id AS ID, v.v_name AS 片名, v.link AS 视频连接, v.is_piracy AS 是否盗版, op.`level` AS 优先级, op.rn AS 地区, NULL AS 投诉日期, NULL AS 下线日期, op.keyword AS 关键词, v.title AS 标题, v.duration AS 时长, v.watch_number AS 观看数量, v.public_time AS 上传时间, v.u_pic AS 头像, v.is_repeat AS 是否重复, op.sort AS 排序, op.batch AS 批次, op.machine AS 机器号, v.u_id AS 用户id, v.u_xid AS u_xid, v.u_name AS 用户名称 FROM sh_dm_video_op_v2 AS op LEFT JOIN sh_dm_video_v2 AS v ON op.v_xid = v.v_xid WHERE op.rn = :rn AND op.batch IN :batches ORDER BY op.id """ ) chunks = pd.read_sql_query( sql, engine, params={"rn": rn, "batches": tuple(batches)}, chunksize=10000 ) dfs = [] for i, chunk in enumerate(chunks, start=1): print(f"[{rn}] 正在拉取第 {i} 块数据,行数:{len(chunk)}") dfs.append(chunk) df = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame() print(f"[{rn}] 全部拉取完成,共 {len(df)} 行") return df def export_all(): # 指定要处理的批次 batches = [1748965168, 1749049335] # 先更新 is_repeat # update_is_repeat(batches) rn_list = get_rn_list() timestamp = datetime.now().strftime("%Y%m%d") for rn in rn_list: print(f"开始处理地区:{rn}") df = fetch_all_data_for_rn(rn, batches) if df.empty: print(f"[{rn}] 无数据,跳过导出") continue safe_rn = rn.replace(" ", "_") filename = f"{timestamp}_T0T1_{safe_rn}.xlsx" print(f"[{rn}] 导出到文件:{filename} …") df.to_excel(filename, index=False) print(f"[{rn}] 导出完成\n") if __name__ == "__main__": export_all()