我正在从一个网站上抓取大量的数据,问题是一个一个地插入到数据库中花费了太多的时间。我正在寻找一种智能的方法来批量插入或批量插入到数据库中,这样就不会像永远一样把它推到数据库中。我使用sqlalchemy1.4
orm和scrapy框架。
型号:
from sqlalchemy import Column, Date, String, Integer, create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from . import settings
engine = create_engine(settings.DATABSE_URL)
Session = sessionmaker(bind=engine)
session = Session()
DeclarativeBase = declarative_base()
class Olx_Eg(DeclarativeBase):
"""
Defines the property listing model
"""
__tablename__ = "olx_egypt"
_id = Column(Integer, primary_key=True)
URL = Column("URL", String)
Breadcrumb = Column("Breadcrumb", String)
Price = Column("Price", String)
Title = Column("Title", String)
Type = Column("Type", String)
Bedrooms = Column("Bedrooms", String)
Bathrooms = Column("Bathrooms", String)
Area = Column("Area", String)
Location = Column("Location", String)
Compound = Column("Compound", String)
seller = Column("seller", String)
Seller_member_since = Column("Seller_member_since", String)
Seller_phone_number = Column("Seller_phone_number", String)
Description = Column("Description", String)
Amenities = Column("Amenities", String)
Reference = Column("Reference", String)
Listed_date = Column("Listed_date", String)
Level = Column("Level", String)
Payment_option = Column("Payment_option", String)
Delivery_term = Column("Delivery_term", String)
Furnished = Column("Furnished", String)
Delivery_date = Column("Delivery_date", String)
Down_payment = Column("Down_payment", String)
Image_url = Column("Image_url", String)
这是我现在的斗志旺盛的管道:
from olx_egypt.models import Olx_Eg, session
class OlxEgPipeline:
def __init__(self):
"""
Initializes database connection and sessionmaker.
Creates items table.
"""
def process_item(self, item, spider):
"""
Process the item and store to database.
"""
# session = self.Session()
instance = session.query(Olx_Eg).filter_by(Reference=item["Reference"]).first()
if instance:
return instance
else:
olx_item = Olx_Eg(**item)
session.add(olx_item)
try:
session.commit()
except:
session.rollback()
raise
finally:
session.close()
return item
我尝试创建一个列表并将项目附加到列表中,然后在关闭spider时将其推送到db:
from olx_egypt.models import Olx_Eg, session
class ExampleScrapyPipeline:
def __init__(self):
self.items = []
def process_item(self, item, spider):
self.items.append(item)
return item
def close_spider(self, spider):
try:
session.bulk_insert_mappings(Olx_Eg, self.items)
session.commit()
except Exception as error:
session.rollback()
raise
finally:
session.close()
但是它在session.bulk_insert_mappings(Olx_Eg, self.items)
这一行失败了。谁能告诉我如何使scrapy pipeline批量或批插入?
1条答案
按热度按时间mrzz3bfm1#
我实际上在做一些非常相似的事情,并建立了一个管道来注入数据与使用
pandas.to_sql
,有更少的代码行需要和它的相当快,因为我已经激活了method='multi'
,如果你上传到mssql
,那么你可以利用fast_executemany=True
,如在这篇文章中提供:Speeding up pandas.DataFrame.to_sql with fast_executemany of pyODBC。我已经尝试使它尽可能通用,以访问不同的驱动程序名称。
下面是一个例子:
scraper.py
items.py
pipelines.py
settings.py:
即使你想创建一个表,你也需要使用
if_exists
并添加append
。因为scrapy是单线程的,它会在每个React器循环后创建并附加值。我希望这对你的速度问题有帮助,因为我还没有用大量的数据进行测试。
它在我这边工作,检查图像:
使用items.py以下内容更新您的www.example.com:
并清除刮刀中的以下内容:
将其改为:
那就不要用
yield
,而是用return
。它对我有效,所以应该对你有效。