167 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import os
from datetime import datetime
from sqlalchemy import (
create_engine, MetaData, Table, Column,Integer,
BigInteger, String, Text, DateTime, text # <-- 导入 text
)
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.engine.url import URL
DB_USER = os.getenv('DB_USER', 'tsreshub_prod')
DB_PASS = os.getenv('DB_PASS', 'Tr5h$Prod!92@TsRH')
DB_HOST = os.getenv('DB_HOST', '39.101.135.56')
DB_PORT = int(os.getenv('DB_PORT', 3306))
DB_NAME = os.getenv('DB_NAME', 'tsreshub_db')
db_url = URL.create(
drivername="mysql+pymysql",
username=DB_USER,
password=DB_PASS,
host=DB_HOST,
port=DB_PORT,
database=DB_NAME,
query={"charset": "utf8mb4"}
)
engine = create_engine(db_url, echo=False, pool_pre_ping=True)
metadata = MetaData()
companies = Table(
'companies_company', metadata,
Column('id', BigInteger, primary_key=True, autoincrement=True),
Column('name', String(200), nullable=False, unique=True),
Column('category', String(100)),
Column('size', String(50)),
Column('company_type', String(100)),
Column('founded_date', String(100)),
Column('introduction', Text, nullable=False),
Column('address', String(300), nullable=False),
Column('benefits', Text),
Column('website_id', BigInteger),
Column('created_at', DateTime, default=datetime.utcnow),
Column('updated_at', DateTime, default=datetime.utcnow, onupdate=datetime.utcnow),
)
resumes = Table(
'resumes_resumebasic', metadata,
Column('id', BigInteger, primary_key=True, autoincrement=True),
Column('resume_id', Integer, nullable=False, index=True),
Column('name', String(255)),
Column('job_region', String(255)),
Column('birthday', String(255)),
Column('education', String(255)),
Column('school', String(255)),
Column('expected_position', String(255)),
Column('last_active_time', String(255)),
Column('marital_status', String(255)),
Column('current_location', String(255)),
Column('age', Integer),
Column('phone', String(255)),
Column('gender', String(255)),
Column('job_type', String(255)),
Column('job_status', String(255)),
Column('work_1_experience', Text),
Column('work_1_time', String(255)),
Column('work_1_description', Text),
Column('work_2_experience', Text),
Column('work_2_time', String(255)),
Column('work_2_description', Text),
Column('work_3_experience', Text),
Column('work_3_time', String(255)),
Column('work_3_description', Text),
Column('work_4_experience', Text),
Column('work_4_time', String(255)),
Column('work_4_description', Text),
Column('height', Integer),
Column('weight', Integer),
Column('work_years', String(255)),
Column('highest_education', String(255)),
Column('ethnicity', String(255)),
Column('update_time', DateTime),
Column('job_function', String(255)),
Column('intended_position', String(255)),
Column('industry', String(255)),
Column('expected_salary', String(255)),
Column('available_time', String(255)),
Column('job_property', String(255)),
Column('job_location', String(255)),
Column('crawl_keywords', String(255)),
Column('source_id', BigInteger), # 外键指向 websites_website(id)
Column('created_at', DateTime, default=datetime.utcnow),
Column('updated_at', DateTime, default=datetime.utcnow, onupdate=datetime.utcnow),
)
class DB:
@classmethod
def insert_resume(cls, data: dict):
safe = {
k: v for k, v in data.items()
if k in resumes.c and isinstance(v, (str, int, float, type(None), datetime))
}
resume_id = safe.get('resume_id')
source_id = safe.get('source_id')
if resume_id is None or source_id is None:
print("❌ 必须提供 resume_id 和 source_id才可插入")
return
stmt = mysql_insert(resumes).values(**safe)
update_cols = {
col.name: stmt.inserted[col.name]
for col in resumes.c
if col.name not in ('id', 'created_at')
}
stmt = stmt.on_duplicate_key_update(**update_cols)
with engine.begin() as conn:
conn.execute(stmt)
print(f"✅ 简历插入/更新成功resume_id={resume_id}, source_id={source_id}")
@classmethod
def insert_company(cls, data: dict):
safe = {
k: v for k, v in data.items()
if k in companies.c and isinstance(v, (str, int, float, type(None), datetime))
}
if 'website' in safe:
safe['website_id'] = safe.pop('website')
if not {'name', 'website_id'}.issubset(safe):
print("❌ 缺少 name 或 website_id无法插入")
return
stmt = mysql_insert(companies).values(**safe)
update_cols = {
col.name: stmt.inserted[col.name]
for col in companies.c
if col.name not in ('id', 'created_at')
}
stmt = stmt.on_duplicate_key_update(**update_cols)
with engine.begin() as conn:
conn.execute(stmt)
print(f"✅ 插入/更新成功:{safe['name']}")
if __name__ == '__main__':
print("→ 尝试连接数据库…")
try:
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
print("✅ 数据库连接成功")
except Exception as e:
print(f"❌ 无法连接数据库:{e}")
exit(1)
test_data = {
'name': '河北遵一建设工程有限公司',
'category': '房地产/建筑/工程',
'size': '20-100人',
'company_type': '民营',
'founded_date': '',
'introduction': '河北遵一建设工程有限公司是一家诚信经营、具有良好口碑的建设工程公司……',
'address': '领袖嘉园西门口对面',
'benefits': '',
'website_id': 1,
}
DB.insert_company(test_data)