scrapy 使用sqlalchemy批量插入碎片管道

xqnpmsa8  于 2022-11-09  发布在  其他
关注(0)|答案(1)|浏览(255)

我正在从一个网站上抓取大量的数据,问题是一个一个地插入到数据库中花费了太多的时间。我正在寻找一种智能的方法来批量插入或批量插入到数据库中,这样就不会像永远一样把它推到数据库中。我使用sqlalchemy1.4 orm和scrapy框架。
型号:

from sqlalchemy import Column, Date, String, Integer, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

from . import settings

engine = create_engine(settings.DATABSE_URL)
Session = sessionmaker(bind=engine)
session = Session()
DeclarativeBase = declarative_base()

class Olx_Eg(DeclarativeBase):
    """
    Defines the property listing model
    """

    __tablename__ = "olx_egypt"
    _id = Column(Integer, primary_key=True)
    URL = Column("URL", String)
    Breadcrumb = Column("Breadcrumb", String)
    Price = Column("Price", String)
    Title = Column("Title", String)
    Type = Column("Type", String)
    Bedrooms = Column("Bedrooms", String)
    Bathrooms = Column("Bathrooms", String)
    Area = Column("Area", String)
    Location = Column("Location", String)
    Compound = Column("Compound", String)
    seller = Column("seller", String)
    Seller_member_since = Column("Seller_member_since", String)
    Seller_phone_number = Column("Seller_phone_number", String)
    Description = Column("Description", String)
    Amenities = Column("Amenities", String)
    Reference = Column("Reference", String)
    Listed_date = Column("Listed_date", String)
    Level = Column("Level", String)
    Payment_option = Column("Payment_option", String)
    Delivery_term = Column("Delivery_term", String)
    Furnished = Column("Furnished", String)
    Delivery_date = Column("Delivery_date", String)
    Down_payment = Column("Down_payment", String)
    Image_url = Column("Image_url", String)

这是我现在的斗志旺盛的管道:

from olx_egypt.models import Olx_Eg, session

class OlxEgPipeline:
    def __init__(self):
        """
        Initializes database connection and sessionmaker.
        Creates items table.
        """

    def process_item(self, item, spider):
        """
        Process the item and store to database.
        """
        # session = self.Session()
        instance = session.query(Olx_Eg).filter_by(Reference=item["Reference"]).first()
        if instance:
            return instance
        else:
            olx_item = Olx_Eg(**item)
            session.add(olx_item)

        try:
            session.commit()
        except:
            session.rollback()
            raise
        finally:
            session.close()

        return item

我尝试创建一个列表并将项目附加到列表中,然后在关闭spider时将其推送到db:

from olx_egypt.models import Olx_Eg, session

class ExampleScrapyPipeline:

    def __init__(self):

        self.items = []

    def process_item(self, item, spider):

        self.items.append(item)

        return item

    def close_spider(self, spider):

        try:
            session.bulk_insert_mappings(Olx_Eg, self.items)
            session.commit()

        except Exception as error:
            session.rollback()
            raise

        finally:
            session.close()

但是它在session.bulk_insert_mappings(Olx_Eg, self.items)这一行失败了。谁能告诉我如何使scrapy pipeline批量或批插入?

mrzz3bfm

mrzz3bfm1#

我实际上在做一些非常相似的事情,并建立了一个管道来注入数据与使用pandas.to_sql,有更少的代码行需要和它的相当快,因为我已经激活了method='multi',如果你上传到mssql,那么你可以利用fast_executemany=True,如在这篇文章中提供:Speeding up pandas.DataFrame.to_sql with fast_executemany of pyODBC
我已经尝试使它尽可能通用,以访问不同的驱动程序名称。
下面是一个例子:
scraper.py

import scrapy
from scrapy_exercises.items import ScrapyExercisesItem
from scrapy.crawler import CrawlerProcess

class SQLTest(scrapy.Spider):
    name = 'SQL'
    start_urls = [f'https://quotes.toscrape.com/page/{i}/' for i in range(1, 11)]

    custom_settings = {
        "FEED": {"test" : {"format": "csv"}}
    }

    def start_requests(self):
        for url in self.start_urls:
            yield scrapy.Request(
                url=url,
                callback = self.parse
            )

    def parse(self, response):
        content = response.xpath("//div[@class='col-md-8']//div")
        for items in content:
            table = ScrapyExercisesItem()
            #table._name= items.xpath(".//span//@href").get()
            #table._keyword= items.xpath(".//div[@class = 'tags']//a[1]//text()").get()
            #yield table.returnTable()
            table['name'] = items.xpath(".//span//@href").get()
            table['keyword'] = items.xpath(".//div[@class = 'tags']//a[1]//text()").get()
            return table

items.py

import scrapy

class ScrapyExercisesItem(scrapy.Item):
    name = scrapy.Field()
    keyword = scrapy.Field()

pipelines.py

from sqlalchemy import create_engine, String
import pandas as pd
import pyodbc
import logging
from itemadapter import is_item
from itemadapter import ItemAdapter

logger = logging.getLogger(__name__)

class DataframeSQLPipelineInject:

    def __init__(self, user, passw, host, port, database, table, if_exists, drivername):
        self._user = user
        self._passw = passw
        self._host = host
        self._port = port
        self._database = database
        self.table = table
        self.if_exists = if_exists
        self.drivername = drivername

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            user = crawler.settings.get('DATABASE')['user'],
            passw = crawler.settings.get('DATABASE')['passw'],
            host = crawler.settings.get('DATABASE')['host'],
            port = crawler.settings.get('DATABASE')['port'],
            database = crawler.settings.get('DATABASE')['database'],
            table = crawler.settings.get('DATABASE')['table'],
            if_exists = crawler.settings.get('DATABASE')['if_exists'],
            drivername = crawler.settings.get('DATABASE')['drivername']
        )

    def open_spider(self, spider): 
        self.engine = create_engine(
            f'{self.drivername}://' + #change this to your required server
            self._user + ':' + 
            self._passw + '@' + 
            self._host + ':' + 
            str(self._port) + '/' + 
            self._database  ,#+f'?driver=ODBC+Driver+18+for+SQL+Server' , #change this to your required driver
            echo=False,
            #connect_args={"timeout":30},
                            pool_pre_ping=True

# fast_executemany=True

# --- Add if using drivername mssql+pyodbc,

# then remove if_exists = self.if_exists from table_df

                                                )

        self.conn = self.engine.connect()

    def close_spider(self, spider):
        self.conn.close()

    def process_item(self,item, spider):
        if is_item(item):
            table_df = pd.DataFrame([ItemAdapter(item).asdict()])
            print(table_df.dtypes)
            table_df.to_sql(self.table, con=self.engine,method='multi',dtype={'name':String(), 'keyword':String()}, chunksize=2000, index=False, if_exists = self.if_exists)
        else:
            logger.error(f'You need a dict for item, you have type: {type(item)}')

settings.py:

DATABASE = {
    "user": "usr",
    "passw": "",
    "host": "localhost",
    "port": '5432',
    "database": "scraper",
    'table':'some_table',
    'if_exists':'append',
    'drivername':'postgresql'
}

# Obey robots.txt rules

ROBOTSTXT_OBEY = False

ITEM_PIPELINES = {
    'scrapy_exercises.pipelines.sql_import.DataframeSQLPipelineInject':50
    }

即使你想创建一个表,你也需要使用if_exists并添加append。因为scrapy是单线程的,它会在每个React器循环后创建并附加值。
我希望这对你的速度问题有帮助,因为我还没有用大量的数据进行测试。
它在我这边工作,检查图像:

使用items.py以下内容更新您的www.example.com:

class ScrapyExercisesItem(scrapy.Item):
    URL = scrapy.Field()
    Breadcrumb = scrapy.Field()
    Price = scrapy.Field()
    Title = scrapy.Field()
    Type = scrapy.Field()
    Bedrooms = scrapy.Field()
    Bathrooms = scrapy.Field()
    Area = scrapy.Field()
    Location = scrapy.Field()
    keyword = scrapy.Field()
    Compound = scrapy.Field()
    seller = scrapy.Field()
    Seller_member_since = scrapy.Field()
    Seller_phone_number = scrapy.Field()
    Description = scrapy.Field()
    Amenities = scrapy.Field()
    Reference = scrapy.Field()
    Listed_date = scrapy.Field()
    Level = scrapy.Field()
    Payment_option = scrapy.Field()
    Delivery_term = scrapy.Field()
    Furnished = scrapy.Field()
    Delivery_date = scrapy.Field()
    Down_payment = scrapy.Field()
    Image_url = scrapy.Field()

并清除刮刀中的以下内容:

item = {}

将其改为:

from your_path.items import ScrapyExercisesItem
item = ScrapyExercisesItem()

那就不要用yield,而是用return。它对我有效,所以应该对你有效。

相关问题