重构数据库连接和公司插入逻辑;更新爬虫域处理

This commit is contained in:
晓丰 2025-05-24 22:52:17 +08:00
parent 45b281e2d7
commit 0ed52acfb9
4 changed files with 113 additions and 94 deletions

View File

@ -118,16 +118,25 @@ class YTSavePipeline:
return item return item
class CompanySavePipeline: class CompanySavePipeline:
def process_item(self, item, spider):
def process_item(self, item, spider): def process_item(self, item, spider):
if spider.name not in ['zhrczp_com_compary']: if spider.name not in ['zhrczp_com_compary']:
return item return item
company_name = item.get("company_name")
if not company_name: # 字段映射
raise DropItem("⚠️ company_name 缺失,已丢弃") if 'website' in item:
item['website_id'] = item.pop('website')
# 检查必要字段
company_name = item.get("name")
website_id = item.get("website_id ")
if not company_name or not website_id:
raise DropItem(f"⚠️ 缺少必要字段,已丢弃: name={company_name}, website_id={website_id}")
try: try:
DB.insert_company(item) DB.insert_company(item)
except Exception as e: except Exception as e:
spider.logger.warning(f"❌ 写入失败company_name={company_name}, 错误={e}") spider.logger.warning(f"❌ 写入失败company_name={company_name}, 错误={e}")
return item return item

View File

@ -8,7 +8,7 @@ from scrapy import Request
class ZunHuaComSpider(scrapy.Spider): class ZunHuaComSpider(scrapy.Spider):
name = 'zhrczp_com' name = 'zhrczp_com'
allowed_domains = ['zhrczp.com'] allowed_domains = ['www.zhrczp.com']
start_urls = ['https://www.zhrczp.com/member/index.php'] start_urls = ['https://www.zhrczp.com/member/index.php']
cookies = { cookies = {
'Hm_lvt_115013d5b34e45eb09d0baedeb1c845a': '1745062179', 'Hm_lvt_115013d5b34e45eb09d0baedeb1c845a': '1745062179',

View File

@ -48,7 +48,7 @@ def extract_company_data(xpathobj):
class ZunHuaComSpider(scrapy.Spider): class ZunHuaComSpider(scrapy.Spider):
name = 'zhrczp_com_compary' name = 'zhrczp_com_compary'
allowed_domains = ['zhrczp.com'] allowed_domains = ['zhrczp.com', 'www.zhrczp.com']
headers = { headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9', 'Accept-Language': 'zh-CN,zh;q=0.9',
@ -67,14 +67,13 @@ class ZunHuaComSpider(scrapy.Spider):
'sec-ch-ua-platform': '"Windows"', 'sec-ch-ua-platform': '"Windows"',
} }
def start_requests(self) -> Iterable[scrapy.Request]: async def start(self) -> Iterable[scrapy.Request]:
for page in range(1, 100000): for page in range(1, 100000):
url = f"https://www.zhrczp.com/company/{page}.html"
yield scrapy.Request( yield scrapy.Request(
url=url, url=f"https://www.zhrczp.com/company/{page}.html",
method='GET',
headers=self.headers, headers=self.headers,
callback=self.parse, callback=self.parse,
dont_filter=True, # 如果需要关闭重复过滤
) )
def parse(self, response): def parse(self, response):

View File

@ -1,92 +1,103 @@
# -*- coding: utf-8 -*-
import os
from datetime import datetime from datetime import datetime
import pymysql from sqlalchemy import (
create_engine, MetaData, Table, Column,
BigInteger, String, Text, DateTime, text # <-- 导入 text
)
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.engine.url import URL
# —— 数据库配置 —— #
DB_USER = os.getenv('DB_USER', 'tsreshub_prod')
DB_PASS = os.getenv('DB_PASS', 'Tr5h$Prod!92@TsRH')
DB_HOST = os.getenv('DB_HOST', '39.101.135.56')
DB_PORT = int(os.getenv('DB_PORT', 3306))
DB_NAME = os.getenv('DB_NAME', 'tsreshub_db')
class MySQLClient: # —— 生成安全的 URL —— #
def __init__(self, host, user, password, db, port=3306): db_url = URL.create(
self.conn = pymysql.connect( drivername="mysql+pymysql",
host=host, username=DB_USER,
user=user, password=DB_PASS,
password=password, host=DB_HOST,
db=db, port=DB_PORT,
port=port, database=DB_NAME,
charset='utf8mb4', query={"charset": "utf8mb4"}
cursorclass=pymysql.cursors.DictCursor, )
autocommit=True
)
self.cursor = self.conn.cursor()
def execute(self, sql, values=None): # —— 创建 Engine —— #
try: engine = create_engine(db_url, echo=False, pool_pre_ping=True)
self.cursor.execute(sql, values or [])
except Exception as e: # —— 定义元数据与表 —— #
print(f"[MySQL] 执行失败: {e}") metadata = MetaData()
self.conn.rollback()
def __del__(self): companies = Table(
try: 'companies_company', metadata,
self.cursor.close() Column('id', BigInteger, primary_key=True, autoincrement=True),
self.conn.close() Column('name', String(200), nullable=False, unique=True),
except Exception: Column('category', String(100)),
pass Column('size', String(50)),
Column('company_type', String(100)),
Column('founded_date', String(100)),
Column('introduction', Text, nullable=False),
Column('address', String(300), nullable=False),
Column('benefits', Text),
Column('website_id', BigInteger),
Column('created_at', DateTime, default=datetime.utcnow),
Column('updated_at', DateTime, default=datetime.utcnow, onupdate=datetime.utcnow),
)
# (可选)首次创建表结构:
# metadata.create_all(engine)
class DB: class DB:
_client: MySQLClient = None # 类属性持有连接
@classmethod
def init(cls):
if cls._client is None:
cls._client = MySQLClient(
host='39.101.135.56',
user='tsreshub_prod',
password='Tr5h$Prod!92@TsRH',
db='tsreshub_db',
port=3306
)
@classmethod
def insert_resume(cls, data: dict):
cls.init() # 保证连接已初始化
# 只保留基本数据类型
safe_data = {k: v for k, v in data.items() if isinstance(v, (str, int, float, type(None), datetime))}
if 'resume_id' not in safe_data or 'source_id' not in safe_data:
# 必须有 source_id + resume_id
return
table = 'resumes_resumebasic'
keys = ', '.join(safe_data.keys())
placeholders = ', '.join(['%s'] * len(safe_data))
# 注意update时排除 source_id 和 resume_id
update_clause = ', '.join([f"{k} = VALUES({k})" for k in safe_data if k not in ('source_id', 'resume_id')])
sql = f"""
INSERT INTO {table} ({keys}) VALUES ({placeholders})
ON DUPLICATE KEY UPDATE {update_clause}
"""
cls._client.execute(sql, list(safe_data.values()))
@classmethod @classmethod
def insert_company(cls, data: dict): def insert_company(cls, data: dict):
if cls._client is None: safe = {
raise RuntimeError("数据库未初始化。首先调用DB.init()。") k: v for k, v in data.items()
if k in companies.c and isinstance(v, (str, int, float, type(None), datetime))
safe_data = {k: v for k, v in data.items() if isinstance(v, (str, int, float, type(None), datetime))} }
if 'name' not in safe_data or 'website' not in safe_data: if 'website' in safe:
safe['website_id'] = safe.pop('website')
if not {'name', 'website_id'}.issubset(safe):
print("❌ 缺少 name 或 website_id无法插入")
return return
table = 'companies_company' stmt = mysql_insert(companies).values(**safe)
keys = ', '.join(safe_data.keys()) update_cols = {
holders = ', '.join(['%s'] * len(safe_data)) col.name: stmt.inserted[col.name]
updates = ', '.join([f"{k}=VALUES({k})" for k in safe_data if k not in ('name','website')]) for col in companies.c
sql = ( if col.name not in ('id', 'created_at')
f"INSERT INTO {table} ({keys}) VALUES ({holders}) " }
f"ON DUPLICATE KEY UPDATE {updates}" stmt = stmt.on_duplicate_key_update(**update_cols)
)
cls._client.execute(sql, list(safe_data.values())) with engine.begin() as conn:
conn.execute(stmt)
print(f"✅ 插入/更新成功:{safe['name']}")
if __name__ == '__main__':
# 测试连接:用 text() 包装 SQL 字符串
print("→ 尝试连接数据库…")
try:
with engine.connect() as conn:
conn.execute(text("SELECT 1")) # <-- 使用 text()
print("✅ 数据库连接成功")
except Exception as e:
print(f"❌ 无法连接数据库:{e}")
exit(1)
# 测试插入数据
test_data = {
'name': '河北遵一建设工程有限公司',
'category': '房地产/建筑/工程',
'size': '20-100人',
'company_type': '民营',
'founded_date': '',
'introduction': '河北遵一建设工程有限公司是一家诚信经营、具有良好口碑的建设工程公司……',
'address': '领袖嘉园西门口对面',
'benefits': '',
'website_id': 1,
}
DB.insert_company(test_data)