122 lines
3.7 KiB
Python
122 lines
3.7 KiB
Python
import pandas as pd
|
|
from datetime import datetime
|
|
from sqlalchemy import create_engine, text
|
|
|
|
DB_CONFIG = {
|
|
"host": "192.144.230.75",
|
|
"port": 3306,
|
|
"user": "db_vidcon",
|
|
"password": "rexdK4fhCCiRE4BZ",
|
|
"database": "db_vidcon",
|
|
"charset": "utf8mb4",
|
|
}
|
|
|
|
URL = (
|
|
f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}"
|
|
f"@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
|
|
f"?charset={DB_CONFIG['charset']}"
|
|
)
|
|
|
|
engine = create_engine(URL, pool_pre_ping=True)
|
|
|
|
# 在导出前执行更新 is_repeat 的 SQL
|
|
def update_is_repeat(batches: list[int]):
|
|
sql = text("""
|
|
UPDATE sh_dm_video_op_v2 AS op
|
|
JOIN (
|
|
SELECT v_xid, COUNT(*) AS cnt
|
|
FROM sh_dm_video_op_v2
|
|
WHERE batch IN :batches
|
|
GROUP BY v_xid
|
|
) AS agg
|
|
ON op.v_xid = agg.v_xid
|
|
SET op.is_repeat = CASE
|
|
WHEN agg.cnt = 1 THEN 1
|
|
ELSE 2
|
|
END
|
|
WHERE op.batch IN :batches;
|
|
""")
|
|
with engine.begin() as conn:
|
|
conn.execute(sql, {"batches": tuple(batches)})
|
|
print(f"已更新批次 {batches} 的 is_repeat 字段。")
|
|
|
|
|
|
def get_rn_list() -> list[str]:
|
|
sql = "SELECT DISTINCT rn FROM sh_dm_video_op_v2;"
|
|
with engine.connect() as conn:
|
|
result = conn.execute(text(sql))
|
|
return [row[0] for row in result]
|
|
|
|
|
|
def fetch_all_data_for_rn(rn: str, batches: list[int]) -> pd.DataFrame:
|
|
sql = text(
|
|
"""
|
|
SELECT
|
|
op.id AS ID,
|
|
v.v_name AS 片名,
|
|
v.link AS 视频连接,
|
|
v.is_piracy AS 是否盗版,
|
|
op.`level` AS 优先级,
|
|
op.rn AS 地区,
|
|
NULL AS 投诉日期,
|
|
NULL AS 下线日期,
|
|
op.keyword AS 关键词,
|
|
v.title AS 标题,
|
|
v.duration AS 时长,
|
|
v.watch_number AS 观看数量,
|
|
v.public_time AS 上传时间,
|
|
v.u_pic AS 头像,
|
|
v.is_repeat AS 是否重复,
|
|
op.sort AS 排序,
|
|
op.batch AS 批次,
|
|
op.machine AS 机器号,
|
|
v.u_id AS 用户id,
|
|
v.u_xid AS u_xid,
|
|
v.u_name AS 用户名称
|
|
FROM sh_dm_video_op_v2 AS op
|
|
LEFT JOIN sh_dm_video_v2 AS v
|
|
ON op.v_xid = v.v_xid
|
|
WHERE op.rn = :rn
|
|
AND op.batch IN :batches
|
|
ORDER BY op.id
|
|
"""
|
|
)
|
|
chunks = pd.read_sql_query(
|
|
sql,
|
|
engine,
|
|
params={"rn": rn, "batches": tuple(batches)},
|
|
chunksize=10000
|
|
)
|
|
dfs = []
|
|
for i, chunk in enumerate(chunks, start=1):
|
|
print(f"[{rn}] 正在拉取第 {i} 块数据,行数:{len(chunk)}")
|
|
dfs.append(chunk)
|
|
df = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
|
|
print(f"[{rn}] 全部拉取完成,共 {len(df)} 行")
|
|
return df
|
|
|
|
|
|
def export_all():
|
|
# 指定要处理的批次
|
|
batches = [1747324254, 1747323990]
|
|
# 先更新 is_repeat
|
|
update_is_repeat(batches)
|
|
|
|
rn_list = get_rn_list()
|
|
timestamp = datetime.now().strftime("%Y%m%d")
|
|
for rn in rn_list:
|
|
print(f"开始处理地区:{rn}")
|
|
df = fetch_all_data_for_rn(rn, batches)
|
|
if df.empty:
|
|
print(f"[{rn}] 无数据,跳过导出")
|
|
continue
|
|
safe_rn = rn.replace(" ", "_")
|
|
filename = f"{timestamp}_T0T1_{safe_rn}.xlsx"
|
|
print(f"[{rn}] 导出到文件:{filename} …")
|
|
df.to_excel(filename, index=False)
|
|
print(f"[{rn}] 导出完成\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
export_all()
|