diff --git a/TS_resume_spider/pipelines.py b/TS_resume_spider/pipelines.py index 74562f6..25dd6c0 100644 --- a/TS_resume_spider/pipelines.py +++ b/TS_resume_spider/pipelines.py @@ -59,7 +59,7 @@ class YTSpiderPipeline: return datetime(2019, 12, 12) def process_item(self, item, spider): - if spider.name not in ['yutian_top','fnrc_vip']: + if spider.name not in ['yutian_top', 'fnrc_vip']: return item experience = item.get("experience", []) for j in range(4): @@ -104,7 +104,7 @@ class YTSpiderPipeline: class YTSavePipeline: def process_item(self, item, spider): - if spider.name not in ['yutian_top' ,'zhrczp_com', 'fnrc_vip', 'qj050_com']: + if spider.name not in ['yutian_top', 'zhrczp_com', 'fnrc_vip', 'qj050_com']: return item resume_id = item.get("resume_id") if not resume_id: @@ -117,26 +117,24 @@ class YTSavePipeline: return item + class CompanySavePipeline: def process_item(self, item, spider): - def process_item(self, item, spider): - if spider.name not in ['zhrczp_com_compary']: - return item + if spider.name not in ['zhrczp_com_compary']: + return item - # 字段映射 - if 'website' in item: - item['website_id'] = item.pop('website') + # 字段映射 + if 'website' in item: + item['website_id'] = item.pop('website') - # 检查必要字段 - company_name = item.get("name") - website_id = item.get("website_id ") - if not company_name or not website_id: - raise DropItem(f"⚠️ 缺少必要字段,已丢弃: name={company_name}, website_id={website_id}") + # 检查必要字段 + company_name = item.get("name") + website_id = item.get("website_id") + if not company_name or not website_id: + return None + try: + DB.insert_company(item) + except Exception as e: + spider.logger.warning(f"❌ 写入失败:company_name={company_name}, 错误={e}") - try: - DB.insert_company(item) - except Exception as e: - spider.logger.warning(f"❌ 写入失败:company_name={company_name}, 错误={e}") - - - return item \ No newline at end of file + return item diff --git a/TS_resume_spider/spiders/zhrczp_com_compary.py b/TS_resume_spider/spiders/zhrczp_com_compary.py index 993f7ee..2cfbb32 100644 --- a/TS_resume_spider/spiders/zhrczp_com_compary.py +++ b/TS_resume_spider/spiders/zhrczp_com_compary.py @@ -68,7 +68,7 @@ class ZunHuaComSpider(scrapy.Spider): } async def start(self) -> Iterable[scrapy.Request]: - for page in range(1, 100000): + for page in range(1000, 100_000): yield scrapy.Request( url=f"https://www.zhrczp.com/company/{page}.html", headers=self.headers, @@ -77,9 +77,7 @@ class ZunHuaComSpider(scrapy.Spider): ) def parse(self, response): - # 使用 lxml 解析 xpathobj = etree.HTML(response.text) - # 调用公共提取函数 company_data = extract_company_data(xpathobj) if company_data: yield company_data \ No newline at end of file diff --git a/TS_resume_spider/utils/db.py b/TS_resume_spider/utils/db.py index f5521b9..bf6564c 100644 --- a/TS_resume_spider/utils/db.py +++ b/TS_resume_spider/utils/db.py @@ -3,20 +3,18 @@ import os from datetime import datetime from sqlalchemy import ( - create_engine, MetaData, Table, Column, + create_engine, MetaData, Table, Column,Integer, BigInteger, String, Text, DateTime, text # <-- 导入 text ) from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.engine.url import URL -# —— 数据库配置 —— # DB_USER = os.getenv('DB_USER', 'tsreshub_prod') DB_PASS = os.getenv('DB_PASS', 'Tr5h$Prod!92@TsRH') DB_HOST = os.getenv('DB_HOST', '39.101.135.56') DB_PORT = int(os.getenv('DB_PORT', 3306)) DB_NAME = os.getenv('DB_NAME', 'tsreshub_db') -# —— 生成安全的 URL —— # db_url = URL.create( drivername="mysql+pymysql", username=DB_USER, @@ -26,33 +24,99 @@ db_url = URL.create( database=DB_NAME, query={"charset": "utf8mb4"} ) - -# —— 创建 Engine —— # engine = create_engine(db_url, echo=False, pool_pre_ping=True) -# —— 定义元数据与表 —— # metadata = MetaData() companies = Table( 'companies_company', metadata, - Column('id', BigInteger, primary_key=True, autoincrement=True), - Column('name', String(200), nullable=False, unique=True), - Column('category', String(100)), - Column('size', String(50)), + Column('id', BigInteger, primary_key=True, autoincrement=True), + Column('name', String(200), nullable=False, unique=True), + Column('category', String(100)), + Column('size', String(50)), Column('company_type', String(100)), Column('founded_date', String(100)), Column('introduction', Text, nullable=False), - Column('address', String(300), nullable=False), - Column('benefits', Text), - Column('website_id', BigInteger), - Column('created_at', DateTime, default=datetime.utcnow), - Column('updated_at', DateTime, default=datetime.utcnow, onupdate=datetime.utcnow), + Column('address', String(300), nullable=False), + Column('benefits', Text), + Column('website_id', BigInteger), + Column('created_at', DateTime, default=datetime.utcnow), + Column('updated_at', DateTime, default=datetime.utcnow, onupdate=datetime.utcnow), +) +resumes = Table( + 'resumes_resumebasic', metadata, + Column('id', BigInteger, primary_key=True, autoincrement=True), + Column('resume_id', Integer, nullable=False, index=True), + Column('name', String(255)), + Column('job_region', String(255)), + Column('birthday', String(255)), + Column('education', String(255)), + Column('school', String(255)), + Column('expected_position', String(255)), + Column('last_active_time', String(255)), + Column('marital_status', String(255)), + Column('current_location', String(255)), + Column('age', Integer), + Column('phone', String(255)), + Column('gender', String(255)), + Column('job_type', String(255)), + Column('job_status', String(255)), + Column('work_1_experience', Text), + Column('work_1_time', String(255)), + Column('work_1_description', Text), + Column('work_2_experience', Text), + Column('work_2_time', String(255)), + Column('work_2_description', Text), + Column('work_3_experience', Text), + Column('work_3_time', String(255)), + Column('work_3_description', Text), + Column('work_4_experience', Text), + Column('work_4_time', String(255)), + Column('work_4_description', Text), + Column('height', Integer), + Column('weight', Integer), + Column('work_years', String(255)), + Column('highest_education', String(255)), + Column('ethnicity', String(255)), + Column('update_time', DateTime), + Column('job_function', String(255)), + Column('intended_position', String(255)), + Column('industry', String(255)), + Column('expected_salary', String(255)), + Column('available_time', String(255)), + Column('job_property', String(255)), + Column('job_location', String(255)), + Column('crawl_keywords', String(255)), + Column('source_id', BigInteger), # 外键指向 websites_website(id) + Column('created_at', DateTime, default=datetime.utcnow), + Column('updated_at', DateTime, default=datetime.utcnow, onupdate=datetime.utcnow), ) -# (可选)首次创建表结构: -# metadata.create_all(engine) class DB: + @classmethod + def insert_resume(cls, data: dict): + safe = { + k: v for k, v in data.items() + if k in resumes.c and isinstance(v, (str, int, float, type(None), datetime)) + } + resume_id = safe.get('resume_id') + source_id = safe.get('source_id') + if resume_id is None or source_id is None: + print("❌ 必须提供 resume_id 和 source_id,才可插入") + return + stmt = mysql_insert(resumes).values(**safe) + update_cols = { + col.name: stmt.inserted[col.name] + for col in resumes.c + if col.name not in ('id', 'created_at') + } + stmt = stmt.on_duplicate_key_update(**update_cols) + + with engine.begin() as conn: + conn.execute(stmt) + print(f"✅ 简历插入/更新成功:resume_id={resume_id}, source_id={source_id}") + @classmethod def insert_company(cls, data: dict): safe = { @@ -77,18 +141,17 @@ class DB: conn.execute(stmt) print(f"✅ 插入/更新成功:{safe['name']}") + if __name__ == '__main__': - # 测试连接:用 text() 包装 SQL 字符串 print("→ 尝试连接数据库…") try: with engine.connect() as conn: - conn.execute(text("SELECT 1")) # <-- 使用 text() + conn.execute(text("SELECT 1")) print("✅ 数据库连接成功") except Exception as e: print(f"❌ 无法连接数据库:{e}") exit(1) - # 测试插入数据 test_data = { 'name': '河北遵一建设工程有限公司', 'category': '房地产/建筑/工程',