fix: 添加更新 is_repeat 字段的功能并修改数据获取函数以支持批次参数

This commit is contained in:
晓丰 2025-05-18 23:54:14 +08:00
parent 0a1d4492ee
commit 90ad9c28ff

View File

@ -19,6 +19,27 @@ URL = (
engine = create_engine(URL, pool_pre_ping=True) engine = create_engine(URL, pool_pre_ping=True)
# 在导出前执行更新 is_repeat 的 SQL
def update_is_repeat(batches: list[int]):
sql = text("""
UPDATE sh_dm_video_op_v2 AS op
JOIN (
SELECT v_xid, COUNT(*) AS cnt
FROM sh_dm_video_op_v2
WHERE batch IN :batches
GROUP BY v_xid
) AS agg
ON op.v_xid = agg.v_xid
SET op.is_repeat = CASE
WHEN agg.cnt = 1 THEN 1
ELSE 2
END
WHERE op.batch IN :batches;
""")
with engine.begin() as conn:
conn.execute(sql, {"batches": tuple(batches)})
print(f"已更新批次 {batches} 的 is_repeat 字段。")
def get_rn_list() -> list[str]: def get_rn_list() -> list[str]:
sql = "SELECT DISTINCT rn FROM sh_dm_video_op_v2;" sql = "SELECT DISTINCT rn FROM sh_dm_video_op_v2;"
@ -27,42 +48,43 @@ def get_rn_list() -> list[str]:
return [row[0] for row in result] return [row[0] for row in result]
def fetch_all_data_for_rn(rn: str) -> pd.DataFrame: def fetch_all_data_for_rn(rn: str, batches: list[int]) -> pd.DataFrame:
sql = """ sql = text(
SELECT """
op.id AS ID, SELECT
v.v_name AS 片名, op.id AS ID,
v.link AS 视频连接, v.v_name AS 片名,
v.is_piracy AS 是否盗版, v.link AS 视频连接,
op.`level` AS 优先级, v.is_piracy AS 是否盗版,
op.rn AS 地区, op.`level` AS 优先级,
NULL AS 投诉日期, op.rn AS 地区,
NULL AS 下线日期, NULL AS 投诉日期,
op.keyword AS 关键词, NULL AS 下线日期,
v.title AS 标题, op.keyword AS 关键词,
v.duration AS 时长, v.title AS 标题,
v.watch_number AS 观看数量, v.duration AS 时长,
v.public_time AS 上传时间, v.watch_number AS 观看数量,
v.u_pic AS 头像, v.public_time AS 上传时间,
v.is_repeat AS 是否重复, v.u_pic AS 头像,
op.sort AS 排序, v.is_repeat AS 是否重复,
op.batch AS 批次, op.sort AS 排序,
op.machine AS 机器号, op.batch AS 批次,
v.u_id AS 用户id, op.machine AS 机器号,
v.u_xid AS u_xid, v.u_id AS 用户id,
v.u_name AS 用户名称 v.u_xid AS u_xid,
FROM sh_dm_video_op_v2 AS op v.u_name AS 用户名称
LEFT JOIN sh_dm_video_v2 AS v FROM sh_dm_video_op_v2 AS op
ON op.v_xid = v.v_xid LEFT JOIN sh_dm_video_v2 AS v
WHERE op.rn = %s ON op.v_xid = v.v_xid
AND op.batch IN (1747324254, 1747323990) WHERE op.rn = :rn
ORDER BY op.id AND op.batch IN :batches
""" ORDER BY op.id
# 注意params 用列表或元组 """
)
chunks = pd.read_sql_query( chunks = pd.read_sql_query(
sql, sql,
engine, engine,
params=(rn,), params={"rn": rn, "batches": tuple(batches)},
chunksize=10000 chunksize=10000
) )
dfs = [] dfs = []
@ -75,11 +97,16 @@ def fetch_all_data_for_rn(rn: str) -> pd.DataFrame:
def export_all(): def export_all():
# 指定要处理的批次
batches = [1747324254, 1747323990]
# 先更新 is_repeat
update_is_repeat(batches)
rn_list = get_rn_list() rn_list = get_rn_list()
timestamp = datetime.now().strftime("%Y%m%d") timestamp = datetime.now().strftime("%Y%m%d")
for rn in rn_list: for rn in rn_list:
print(f"开始处理地区:{rn}") print(f"开始处理地区:{rn}")
df = fetch_all_data_for_rn(rn) df = fetch_all_data_for_rn(rn, batches)
if df.empty: if df.empty:
print(f"[{rn}] 无数据,跳过导出") print(f"[{rn}] 无数据,跳过导出")
continue continue