添加职位数据表及相关插入逻辑;更新爬虫以提取公司和职位信息

This commit is contained in:
晓丰 2025-05-25 22:39:04 +08:00
parent 8812b91416
commit 542f2ce0bd
4 changed files with 124 additions and 38 deletions

View File

@ -123,11 +123,8 @@ class CompanySavePipeline:
if spider.name not in ['zhrczp_com_compary']: if spider.name not in ['zhrczp_com_compary']:
return item return item
# 字段映射
if 'website' in item: if 'website' in item:
item['website_id'] = item.pop('website') item['website_id'] = item.pop('website')
# 检查必要字段
company_name = item.get("name") company_name = item.get("name")
website_id = item.get("website_id") website_id = item.get("website_id")
if not company_name or not website_id: if not company_name or not website_id:
@ -136,5 +133,23 @@ class CompanySavePipeline:
DB.insert_company(item) DB.insert_company(item)
except Exception as e: except Exception as e:
spider.logger.warning(f"❌ 写入失败company_name={company_name}, 错误={e}") spider.logger.warning(f"❌ 写入失败company_name={company_name}, 错误={e}")
return item
class PositionSavePipeline:
def process_item(self, item, spider):
if spider.name not in ['zhrczp_com_position']:
return item
title = item.get("title")
company_name = item.pop("company_name")
item['company_id'] = DB.get_company_id(company_name)
if not title or not company_name:
return None
try:
DB.insert_position(item)
except Exception as e:
spider.logger.warning(f"❌ 写入失败title={title}, company_name={company_name}, 错误={e}")
return item return item

View File

@ -1,6 +1,7 @@
# Scrapy 项目 TS_resume_spider 的配置文件 # Scrapy 项目 TS_resume_spider 的配置文件
# 指定项目名称,默认会用在 User-Agent 和内部调用 # 指定项目名称,默认会用在 User-Agent 和内部调用
from scrapy.settings.default_settings import TELNETCONSOLE_ENABLED
BOT_NAME = "TS_resume_spider" BOT_NAME = "TS_resume_spider"
# 指定爬虫类所在的模块(路径) # 指定爬虫类所在的模块(路径)
@ -13,7 +14,10 @@ NEWSPIDER_MODULE = "TS_resume_spider.spiders"
# 是否遵守 robots.txt 规则(推荐 False # 是否遵守 robots.txt 规则(推荐 False
ROBOTSTXT_OBEY = False ROBOTSTXT_OBEY = False
# 是否启用日志记录(默认 True可设置为 False 以禁用
OFFSITE_ENABLED = False
LOG_LEVEL = "INFO" # 设置日志级别为 INFO减少输出量
TELNETCONSOLE_ENABLED = False
# 配置 Scrapy 最大并发请求数(默认 16 # 配置 Scrapy 最大并发请求数(默认 16
CONCURRENT_REQUESTS = 64 # 设置并发量为8减少服务器压力避免被断连 CONCURRENT_REQUESTS = 64 # 设置并发量为8减少服务器压力避免被断连
@ -57,7 +61,8 @@ RETRY_HTTP_CODES = [500, 502, 503, 504, 522, 524, 408, 429]
ITEM_PIPELINES = { ITEM_PIPELINES = {
'TS_resume_spider.pipelines.YTSpiderPipeline': 300, 'TS_resume_spider.pipelines.YTSpiderPipeline': 300,
'TS_resume_spider.pipelines.YTSavePipeline': 500, 'TS_resume_spider.pipelines.YTSavePipeline': 500,
'TS_resume_spider.pipelines.CompanySavePipeline': 600, 'TS_resume_spider.pipelines.CompanySavePipeline': 501,
'TS_resume_spider.pipelines.PositionSavePipeline': 502,
} }
# 设置输出文件编码,防止中文乱码 # 设置输出文件编码,防止中文乱码

View File

@ -3,9 +3,7 @@ import scrapy
from lxml import etree from lxml import etree
def extract_company_data(xpathobj): def extract_company_data(xpathobj):
"""从 etree.HTML 对象中提取公司信息,返回 dict 或 None。"""
def first_or_empty(path): def first_or_empty(path):
lst = xpathobj.xpath(path) lst = xpathobj.xpath(path)
return lst[0].strip() if lst else "" return lst[0].strip() if lst else ""
@ -30,7 +28,7 @@ def extract_company_data(xpathobj):
benefits = [b.strip() for b in xpathobj.xpath('//div[@class="com_welfare "]/span/text()') if b.strip()] benefits = [b.strip() for b in xpathobj.xpath('//div[@class="com_welfare "]/span/text()') if b.strip()]
benefits_str = " | ".join(benefits) benefits_str = " | ".join(benefits)
address = first_or_empty('//div[@class="com_details_tel_me"]/div/text()') address = first_or_empty('//div[@class="firm_name"]/div/text()')
return { return {
"name": name, "name": name,
@ -45,6 +43,10 @@ def extract_company_data(xpathobj):
} }
def get_company_href(xpathobj):
hrefs = xpathobj.xpath('//div[@class="firm_name"]/span/a/@href')
return [href.strip() for href in hrefs if href.strip()]
class ZunHuaComSpider(scrapy.Spider): class ZunHuaComSpider(scrapy.Spider):
name = 'zhrczp_com_compary' name = 'zhrczp_com_compary'
@ -55,7 +57,6 @@ class ZunHuaComSpider(scrapy.Spider):
'Cache-Control': 'no-cache', 'Cache-Control': 'no-cache',
'Connection': 'keep-alive', 'Connection': 'keep-alive',
'Pragma': 'no-cache', 'Pragma': 'no-cache',
'Referer': 'https://www.zhrczp.com/member/index.php?c=resume&jobin=76&jobclass_search=76&cityin=&cityclass_search=&keyword=&minsalary=&maxsalary=&minage=&maxage=&exp=&edu=&uptime=&sex=&type=',
'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'same-origin', 'Sec-Fetch-Site': 'same-origin',
@ -67,16 +68,39 @@ class ZunHuaComSpider(scrapy.Spider):
'sec-ch-ua-platform': '"Windows"', 'sec-ch-ua-platform': '"Windows"',
} }
async def start(self) -> Iterable[scrapy.Request]: def start_requests(self) -> Iterable[scrapy.Request]:
for page in range(1000, 100_000):
for page in range(1, 186):
self.logger.info(f"Fetching company list page: {page}")
yield scrapy.Request( yield scrapy.Request(
url=f"https://www.zhrczp.com/company/{page}.html", url=f"https://www.zhrczp.com/company/list/0-0-0-0-0-0-{page}.html",
headers=self.headers, headers=self.headers,
callback=self.parse, callback=self.parse,
dont_filter=True, # 如果需要关闭重复过滤 dont_filter=True,
)
for page in range(1, 10):
self.logger.info(f"Fetching company list page: {page}")
yield scrapy.Request(
url=f"https://www.zhrczp.com/company/list/0-0-0-0-0-1-{page}.html",
headers=self.headers,
callback=self.parse,
dont_filter=True,
) )
def parse(self, response): def parse(self, response):
xpathobj = etree.HTML(response.text)
company_href = get_company_href(xpathobj)
if company_href:
for href in company_href:
self.logger.debug(href)
yield scrapy.Request(
url=href,
headers=self.headers,
callback=self.parse_company,
dont_filter=True,
)
def parse_company(self, response):
xpathobj = etree.HTML(response.text) xpathobj = etree.HTML(response.text)
company_data = extract_company_data(xpathobj) company_data = extract_company_data(xpathobj)
if company_data: if company_data:

View File

@ -1,9 +1,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import os import os
from datetime import datetime from datetime import datetime
from sqlalchemy.sql import select
from sqlalchemy import ( from sqlalchemy import (
create_engine, MetaData, Table, Column,Integer, create_engine, MetaData, Table, Column, Integer,
BigInteger, String, Text, DateTime, text # <-- 导入 text BigInteger, String, Text, DateTime, text # <-- 导入 text
) )
from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.dialects.mysql import insert as mysql_insert
@ -92,6 +92,26 @@ resumes = Table(
Column('updated_at', DateTime, default=datetime.utcnow, onupdate=datetime.utcnow), Column('updated_at', DateTime, default=datetime.utcnow, onupdate=datetime.utcnow),
) )
table_positions = Table(
'positions_position', metadata,
Column('id', BigInteger, primary_key=True, autoincrement=True),
Column('title', String(200), nullable=False),
Column('nature', String(50)),
Column('category', String(100)),
Column('region', String(100)),
Column('experience', String(100)),
Column('education', String(100)),
Column('salary', String(100)),
Column('company_id', BigInteger, nullable=False),
Column('website_id', BigInteger, nullable=False),
Column('benefits', Text),
Column('contact_info', String(200)),
Column('contact_name', String(100)),
Column('description', Text),
Column('openings', Integer),
Column('position_status', Integer),
)
class DB: class DB:
@classmethod @classmethod
@ -139,7 +159,28 @@ class DB:
with engine.begin() as conn: with engine.begin() as conn:
conn.execute(stmt) conn.execute(stmt)
print(f"✅ 插入/更新成功:{safe['name']}")
@classmethod
def get_company_id(cls, company_name: str):
stmt = select(companies.c.id).where(companies.c.name == company_name)
with engine.connect() as conn:
result = conn.execute(stmt).scalar()
return result
@classmethod
def insert_position(cls, data: dict):
company_id = data.get('company_id')
title = data.get('title')
website_id = data.get('website_id')
if not title or website_id is None:
return
safe = {k: v for k, v in data.items() if k in table_positions.c and k != 'company_name'}
safe['company_id'] = company_id
stmt = mysql_insert(table_positions).values(**safe)
update_cols = {col.name: stmt.inserted[col.name] for col in table_positions.c if col.name != 'id'}
stmt = stmt.on_duplicate_key_update(**update_cols)
with engine.begin() as conn:
conn.execute(stmt)
if __name__ == '__main__': if __name__ == '__main__':
@ -152,15 +193,16 @@ if __name__ == '__main__':
print(f"❌ 无法连接数据库:{e}") print(f"❌ 无法连接数据库:{e}")
exit(1) exit(1)
test_data = { # test_data = {
'name': '河北遵一建设工程有限公司', # 'name': '河北遵一建设工程有限公司',
'category': '房地产/建筑/工程', # 'category': '房地产/建筑/工程',
'size': '20-100人', # 'size': '20-100人',
'company_type': '民营', # 'company_type': '民营',
'founded_date': '', # 'founded_date': '',
'introduction': '河北遵一建设工程有限公司是一家诚信经营、具有良好口碑的建设工程公司……', # 'introduction': '河北遵一建设工程有限公司是一家诚信经营、具有良好口碑的建设工程公司……',
'address': '领袖嘉园西门口对面', # 'address': '领袖嘉园西门口对面',
'benefits': '', # 'benefits': '',
'website_id': 1, # 'website_id': 1,
} # }
DB.insert_company(test_data) # DB.insert_company(test_data)
print(DB.get_company_id("托管教育"))