From 0ed52acfb9560b4f13a9352e0f9deba15b61b724 Mon Sep 17 00:00:00 2001 From: Franklin-F Date: Sat, 24 May 2025 22:52:17 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=95=B0=E6=8D=AE=E5=BA=93?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E5=92=8C=E5=85=AC=E5=8F=B8=E6=8F=92=E5=85=A5?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=9B=E6=9B=B4=E6=96=B0=E7=88=AC=E8=99=AB?= =?UTF-8?q?=E5=9F=9F=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TS_resume_spider/pipelines.py | 29 +-- TS_resume_spider/spiders/zhrczp_com.py | 2 +- .../spiders/zhrczp_com_compary.py | 9 +- TS_resume_spider/utils/db.py | 167 ++++++++++-------- 4 files changed, 113 insertions(+), 94 deletions(-) diff --git a/TS_resume_spider/pipelines.py b/TS_resume_spider/pipelines.py index b7b9a3a..74562f6 100644 --- a/TS_resume_spider/pipelines.py +++ b/TS_resume_spider/pipelines.py @@ -119,15 +119,24 @@ class YTSavePipeline: class CompanySavePipeline: def process_item(self, item, spider): - if spider.name not in ['zhrczp_com_compary']: - return item - company_name = item.get("company_name") - if not company_name: - raise DropItem("⚠️ company_name 缺失,已丢弃") + def process_item(self, item, spider): + if spider.name not in ['zhrczp_com_compary']: + return item - try: - DB.insert_company(item) - except Exception as e: - spider.logger.warning(f"❌ 写入失败:company_name={company_name}, 错误={e}") + # 字段映射 + if 'website' in item: + item['website_id'] = item.pop('website') - return item \ No newline at end of file + # 检查必要字段 + company_name = item.get("name") + website_id = item.get("website_id ") + if not company_name or not website_id: + raise DropItem(f"⚠️ 缺少必要字段,已丢弃: name={company_name}, website_id={website_id}") + + try: + DB.insert_company(item) + except Exception as e: + spider.logger.warning(f"❌ 写入失败:company_name={company_name}, 错误={e}") + + + return item \ No newline at end of file diff --git a/TS_resume_spider/spiders/zhrczp_com.py b/TS_resume_spider/spiders/zhrczp_com.py index 9946ca3..49978f6 100644 --- a/TS_resume_spider/spiders/zhrczp_com.py +++ b/TS_resume_spider/spiders/zhrczp_com.py @@ -8,7 +8,7 @@ from scrapy import Request class ZunHuaComSpider(scrapy.Spider): name = 'zhrczp_com' - allowed_domains = ['zhrczp.com'] + allowed_domains = ['www.zhrczp.com'] start_urls = ['https://www.zhrczp.com/member/index.php'] cookies = { 'Hm_lvt_115013d5b34e45eb09d0baedeb1c845a': '1745062179', diff --git a/TS_resume_spider/spiders/zhrczp_com_compary.py b/TS_resume_spider/spiders/zhrczp_com_compary.py index 040acd4..993f7ee 100644 --- a/TS_resume_spider/spiders/zhrczp_com_compary.py +++ b/TS_resume_spider/spiders/zhrczp_com_compary.py @@ -48,7 +48,7 @@ def extract_company_data(xpathobj): class ZunHuaComSpider(scrapy.Spider): name = 'zhrczp_com_compary' - allowed_domains = ['zhrczp.com'] + allowed_domains = ['zhrczp.com', 'www.zhrczp.com'] headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9', @@ -67,14 +67,13 @@ class ZunHuaComSpider(scrapy.Spider): 'sec-ch-ua-platform': '"Windows"', } - def start_requests(self) -> Iterable[scrapy.Request]: + async def start(self) -> Iterable[scrapy.Request]: for page in range(1, 100000): - url = f"https://www.zhrczp.com/company/{page}.html" yield scrapy.Request( - url=url, - method='GET', + url=f"https://www.zhrczp.com/company/{page}.html", headers=self.headers, callback=self.parse, + dont_filter=True, # 如果需要关闭重复过滤 ) def parse(self, response): diff --git a/TS_resume_spider/utils/db.py b/TS_resume_spider/utils/db.py index df8b41d..f5521b9 100644 --- a/TS_resume_spider/utils/db.py +++ b/TS_resume_spider/utils/db.py @@ -1,92 +1,103 @@ +# -*- coding: utf-8 -*- +import os from datetime import datetime -import pymysql +from sqlalchemy import ( + create_engine, MetaData, Table, Column, + BigInteger, String, Text, DateTime, text # <-- 导入 text +) +from sqlalchemy.dialects.mysql import insert as mysql_insert +from sqlalchemy.engine.url import URL +# —— 数据库配置 —— # +DB_USER = os.getenv('DB_USER', 'tsreshub_prod') +DB_PASS = os.getenv('DB_PASS', 'Tr5h$Prod!92@TsRH') +DB_HOST = os.getenv('DB_HOST', '39.101.135.56') +DB_PORT = int(os.getenv('DB_PORT', 3306)) +DB_NAME = os.getenv('DB_NAME', 'tsreshub_db') -class MySQLClient: - def __init__(self, host, user, password, db, port=3306): - self.conn = pymysql.connect( - host=host, - user=user, - password=password, - db=db, - port=port, - charset='utf8mb4', - cursorclass=pymysql.cursors.DictCursor, - autocommit=True - ) - self.cursor = self.conn.cursor() +# —— 生成安全的 URL —— # +db_url = URL.create( + drivername="mysql+pymysql", + username=DB_USER, + password=DB_PASS, + host=DB_HOST, + port=DB_PORT, + database=DB_NAME, + query={"charset": "utf8mb4"} +) - def execute(self, sql, values=None): - try: - self.cursor.execute(sql, values or []) +# —— 创建 Engine —— # +engine = create_engine(db_url, echo=False, pool_pre_ping=True) - except Exception as e: - print(f"[MySQL] 执行失败: {e}") - self.conn.rollback() +# —— 定义元数据与表 —— # +metadata = MetaData() - def __del__(self): - try: - self.cursor.close() - self.conn.close() - except Exception: - pass +companies = Table( + 'companies_company', metadata, + Column('id', BigInteger, primary_key=True, autoincrement=True), + Column('name', String(200), nullable=False, unique=True), + Column('category', String(100)), + Column('size', String(50)), + Column('company_type', String(100)), + Column('founded_date', String(100)), + Column('introduction', Text, nullable=False), + Column('address', String(300), nullable=False), + Column('benefits', Text), + Column('website_id', BigInteger), + Column('created_at', DateTime, default=datetime.utcnow), + Column('updated_at', DateTime, default=datetime.utcnow, onupdate=datetime.utcnow), +) +# (可选)首次创建表结构: +# metadata.create_all(engine) class DB: - _client: MySQLClient = None # 类属性持有连接 - - @classmethod - def init(cls): - if cls._client is None: - cls._client = MySQLClient( - host='39.101.135.56', - user='tsreshub_prod', - password='Tr5h$Prod!92@TsRH', - db='tsreshub_db', - port=3306 - ) - - @classmethod - def insert_resume(cls, data: dict): - cls.init() # 保证连接已初始化 - - # 只保留基本数据类型 - safe_data = {k: v for k, v in data.items() if isinstance(v, (str, int, float, type(None), datetime))} - - if 'resume_id' not in safe_data or 'source_id' not in safe_data: - # 必须有 source_id + resume_id - return - - table = 'resumes_resumebasic' - keys = ', '.join(safe_data.keys()) - placeholders = ', '.join(['%s'] * len(safe_data)) - - # 注意:update时排除 source_id 和 resume_id - update_clause = ', '.join([f"{k} = VALUES({k})" for k in safe_data if k not in ('source_id', 'resume_id')]) - - sql = f""" - INSERT INTO {table} ({keys}) VALUES ({placeholders}) - ON DUPLICATE KEY UPDATE {update_clause} - """ - - cls._client.execute(sql, list(safe_data.values())) - @classmethod def insert_company(cls, data: dict): - if cls._client is None: - raise RuntimeError("数据库未初始化。首先调用DB.init()。") - - safe_data = {k: v for k, v in data.items() if isinstance(v, (str, int, float, type(None), datetime))} - if 'name' not in safe_data or 'website' not in safe_data: + safe = { + k: v for k, v in data.items() + if k in companies.c and isinstance(v, (str, int, float, type(None), datetime)) + } + if 'website' in safe: + safe['website_id'] = safe.pop('website') + if not {'name', 'website_id'}.issubset(safe): + print("❌ 缺少 name 或 website_id,无法插入") return - table = 'companies_company' - keys = ', '.join(safe_data.keys()) - holders = ', '.join(['%s'] * len(safe_data)) - updates = ', '.join([f"{k}=VALUES({k})" for k in safe_data if k not in ('name','website')]) - sql = ( - f"INSERT INTO {table} ({keys}) VALUES ({holders}) " - f"ON DUPLICATE KEY UPDATE {updates}" - ) - cls._client.execute(sql, list(safe_data.values())) \ No newline at end of file + stmt = mysql_insert(companies).values(**safe) + update_cols = { + col.name: stmt.inserted[col.name] + for col in companies.c + if col.name not in ('id', 'created_at') + } + stmt = stmt.on_duplicate_key_update(**update_cols) + + with engine.begin() as conn: + conn.execute(stmt) + print(f"✅ 插入/更新成功:{safe['name']}") + +if __name__ == '__main__': + # 测试连接:用 text() 包装 SQL 字符串 + print("→ 尝试连接数据库…") + try: + with engine.connect() as conn: + conn.execute(text("SELECT 1")) # <-- 使用 text() + print("✅ 数据库连接成功") + except Exception as e: + print(f"❌ 无法连接数据库:{e}") + exit(1) + + # 测试插入数据 + test_data = { + 'name': '河北遵一建设工程有限公司', + 'category': '房地产/建筑/工程', + 'size': '20-100人', + 'company_type': '民营', + 'founded_date': '', + 'introduction': '河北遵一建设工程有限公司是一家诚信经营、具有良好口碑的建设工程公司……', + 'address': '领袖嘉园西门口对面', + 'benefits': '', + 'website_id': 1, + } + DB.insert_company(test_data)