diff --git a/mysql_to_xlsx.py b/mysql_to_xlsx.py index 85584d0..1b61078 100644 --- a/mysql_to_xlsx.py +++ b/mysql_to_xlsx.py @@ -1,9 +1,8 @@ -import pymysql import pandas as pd from datetime import datetime +from sqlalchemy import create_engine, text -# 数据库连接配置 -db_config = { +DB_CONFIG = { "host": "192.144.230.75", "port": 3306, "user": "db_vidcon", @@ -12,66 +11,84 @@ db_config = { "charset": "utf8mb4", } -def get_rn_list(): - """获取所有地区列表""" - sql = "SELECT DISTINCT rn FROM sh_dm_video_op_v2;" - conn = pymysql.connect(**db_config) - df = pd.read_sql(sql, conn) - conn.close() - return df['rn'].tolist() +URL = ( + f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}" + f"@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}" + f"?charset={DB_CONFIG['charset']}" +) -def get_data_for_rn(rn: str) -> pd.DataFrame: - """针对指定 rn 拉取数据""" - # 注意:这里把 SQL 中的 rn 和 level 参数化 - sql = f""" - SELECT - op.id AS ID, - v.v_name AS 片名, - v.link AS 视频连接, - v.is_piracy AS 是否盗版, - op.`level` AS 优先级, - op.rn AS 地区, - NULL AS 投诉日期, - NULL AS 下线日期, - op.keyword AS 关键词, - v.title AS 标题, - v.duration AS 时长, - v.watch_number AS 观看数量, - v.public_time AS 上传时间, - v.u_pic AS 头像, - v.is_repeat AS 是否重复, -- 直接用字段 - op.sort AS 排序, - op.batch AS 批次, - op.machine AS 机器号, - v.u_id AS 用户id, - v.u_xid AS u_xid, - v.u_name AS 用户名称 - FROM sh_dm_video_op_v2 AS op - LEFT JOIN sh_dm_video_v2 AS v - ON op.v_xid = v.v_xid - WHERE op.rn = %s - AND op.batch IN (1747324254, 1747323990) - ORDER BY op.id; +engine = create_engine(URL, pool_pre_ping=True) + + +def get_rn_list() -> list[str]: + sql = "SELECT DISTINCT rn FROM sh_dm_video_op_v2;" + with engine.connect() as conn: + result = conn.execute(text(sql)) + return [row[0] for row in result] + + +def fetch_all_data_for_rn(rn: str) -> pd.DataFrame: + sql = """ + SELECT + op.id AS ID, + v.v_name AS 片名, + v.link AS 视频连接, + v.is_piracy AS 是否盗版, + op.`level` AS 优先级, + op.rn AS 地区, + NULL AS 投诉日期, + NULL AS 下线日期, + op.keyword AS 关键词, + v.title AS 标题, + v.duration AS 时长, + v.watch_number AS 观看数量, + v.public_time AS 上传时间, + v.u_pic AS 头像, + v.is_repeat AS 是否重复, + op.sort AS 排序, + op.batch AS 批次, + op.machine AS 机器号, + v.u_id AS 用户id, + v.u_xid AS u_xid, + v.u_name AS 用户名称 + FROM sh_dm_video_op_v2 AS op + LEFT JOIN sh_dm_video_v2 AS v + ON op.v_xid = v.v_xid + WHERE op.rn = %s + AND op.batch IN (1747324254, 1747323990) + ORDER BY op.id """ - conn = pymysql.connect(**db_config) - df = pd.read_sql(sql, conn, params=(rn)) - conn.close() + # 注意:params 用列表或元组 + chunks = pd.read_sql_query( + sql, + engine, + params=(rn,), + chunksize=10000 + ) + dfs = [] + for i, chunk in enumerate(chunks, start=1): + print(f"[{rn}] 正在拉取第 {i} 块数据,行数:{len(chunk)}") + dfs.append(chunk) + df = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame() + print(f"[{rn}] 全部拉取完成,共 {len(df)} 行") return df + def export_all(): - """循环所有地区,导出 Excel""" rn_list = get_rn_list() + timestamp = datetime.now().strftime("%Y%m%d") for rn in rn_list: - df = get_data_for_rn(rn) + print(f"开始处理地区:{rn}") + df = fetch_all_data_for_rn(rn) if df.empty: + print(f"[{rn}] 无数据,跳过导出") continue - - timestamp = datetime.now().strftime("%Y%m%d") - safe_rn = rn.replace(" ", "_") # 如果地区名里有空格或特殊字符 + safe_rn = rn.replace(" ", "_") filename = f"{timestamp}_T0T1_{safe_rn}.xlsx" - + print(f"[{rn}] 导出到文件:{filename} …") df.to_excel(filename, index=False) - print(f"已导出:{filename}") + print(f"[{rn}] 导出完成\n") + if __name__ == "__main__": export_all()