feat: 添加select导入以支持查询操作

This commit is contained in:
晓丰 2025-06-29 11:36:36 +08:00
parent 65eada4a6e
commit b511dbc962

122
DB.py
View File

@ -802,26 +802,24 @@ class DBSA:
# 生成所有视频唯一键 (v_xid, v_name)
all_vid_keys = list({(row["v_xid"], row["v_name"]) for row in vid_rows})
conn = _engine.connect()
try:
# 查询数据库中已存在的视频记录
sel_vid = select([
video.c.v_xid,
video.c.v_name
]).where(
tuple_(video.c.v_xid, video.c.v_name).in_(all_vid_keys)
)
result = conn.execute(sel_vid)
vid_existing_keys = {(row.v_xid, row.v_name) for row in result}
except Exception as e:
logger.error(f"查询视频表时异常: {e}")
if all_vid_keys: # 仅当有键时才查询
conn = _engine.connect()
try:
cls.push_record_many(payloads)
except Exception as re:
logger.error("[Redis 回退失败]", re)
return
finally:
conn.close()
# 使用SQLAlchemy 1.4+的select语法
sel_vid = select(video.c.v_xid, video.c.v_name).where(
tuple_(video.c.v_xid, video.c.v_name).in_(all_vid_keys)
)
result = conn.execute(sel_vid)
vid_existing_keys = {(row.v_xid, row.v_name) for row in result}
except Exception as e:
logger.error(f"查询视频表时异常: {e}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.error("[Redis 回退失败]", re)
return
finally:
conn.close()
# 设置重复标志
for i, vid_row in enumerate(vid_rows):
@ -834,55 +832,59 @@ class DBSA:
# ========== 操作记录表去重 ==========
final_op_rows = []
if op_rows:
try:
# 生成所有操作记录索引键
op_check_keys = [
(row["rn"], row["name_title"], row["keyword"], row["v_xid"], row["createtime"])
for row in op_rows
]
# 生成所有操作记录索引键
op_check_keys = [
(row["rn"], row["name_title"], row["keyword"], row["v_xid"], row["createtime"])
for row in op_rows
]
# 查询数据库已存在的操作记录
conn = _engine.connect()
stmt = select([
video_op.c.rn,
video_op.c.name_title,
video_op.c.keyword,
video_op.c.v_xid,
video_op.c.createtime
]).where(
tuple_(
if op_check_keys: # 仅当有键时才查询
try:
# 查询数据库已存在的操作记录
conn = _engine.connect()
# 使用SQLAlchemy 1.4+的select语法
stmt = select(
video_op.c.rn,
video_op.c.name_title,
video_op.c.keyword,
video_op.c.v_xid,
video_op.c.createtime
).in_(op_check_keys)
)
result = conn.execute(stmt)
existing_db_op_keys = {tuple(row) for row in result}
conn.close()
except Exception as e:
logger.error(f"查询操作记录表失败: {e}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.error("Redis退回失败", re)
return
).where(
tuple_(
video_op.c.rn,
video_op.c.name_title,
video_op.c.keyword,
video_op.c.v_xid,
video_op.c.createtime
).in_(op_check_keys)
)
result = conn.execute(stmt)
existing_db_op_keys = {tuple(row) for row in result}
conn.close()
except Exception as e:
logger.error(f"查询操作记录表失败: {e}")
try:
cls.push_record_many(payloads)
except Exception as re:
logger.error("Redis退回失败", re)
return
# 过滤重复操作记录
for idx, row in enumerate(op_rows):
index_key = (
row["rn"],
row["name_title"],
row["keyword"],
row["v_xid"],
row["createtime"]
)
# 过滤重复操作记录
for idx, row in enumerate(op_rows):
index_key = (
row["rn"],
row["name_title"],
row["keyword"],
row["v_xid"],
row["createtime"]
)
# 检查是否在数据库或当前批次中重复
if index_key not in existing_db_op_keys and index_key not in existing_op_keys_batch:
final_op_rows.append(row)
existing_op_keys_batch.add(index_key)
# 检查是否在数据库或当前批次中重复
if index_key not in existing_db_op_keys and index_key not in existing_op_keys_batch:
final_op_rows.append(row)
existing_op_keys_batch.add(index_key)
else:
final_op_rows = op_rows[:] # 如果没有键,直接使用所有行
# ========== 作者表处理 ==========
authors_map = {}