在我的上一篇文章中,我问过如何使用Scrapy来批量记录物品。
Buffered items and bulk insert to Mysql using scrapy
在@ Alexandria 的帮助下,我可以在缓存中保存1000个项目。但是,我这里的问题是,该高速缓存中的项目在被传输到mysql的过程中被一个一个地记录下来。我这里唯一的问题是速度。我认为这个问题是由于SQL代码优化不够造成的。
保存在SQL中的逻辑如下所示;
将这些项添加到products
表中,如果product_id
不存在,则将其添加到new_products
表中。(我正在后台运行一个脚本,将这些行从旧行删除到新行。我在这里没有问题。换句话说,总共最多记录50 k行。)
可能是mysql在插入到new_products
表的过程中速度变慢了。因为它会检查product_id是否存在于现有行中。
如果你能给我一个方法,让我一次在数据库中保存1000个项目,我会非常高兴。
我pipeline.py正在使用的www.example.com:
from __future__ import print_function
import logging
from scrapy import signals
from itemadapter import ItemAdapter
from mysql.connector import errorcode
from amazon_scraper.items import AmazonMobileDetailsItem
import mysql.connector
class AmazonScraperPipeline:
table = 'products'
table2 = 'new_products'
conf = {
'host': 'localhost',
'user': 'xxxxxx',
'password': 'xxxxxx',
'database': 'xxxxxxx',
'raise_on_warnings': True
}
def __init__(self,**kwargs):
self._rows = [] # store rows temporarily
self._cached_rows = 0 # number of cached rows
self._cache_limit = 1000 # limit before saving to database
self.cnx = self.mysql_connect()
def open_spider(self, spider):
print("spider open")
def save_all(self): # calls self.save method for all cached rows
if len(self._rows) > 0:
list(map(self.save, self._rows))
self._cached_rows = 0 # reset the count
self._rows = [] # reset the cache
def cache_result(self, item): # adds new row to cache
self._rows.append(dict(item))
self._cached_rows += 1
if self._cached_rows >= self._cache_limit: # checks if limit reached
self.save_all() # if it has been reached then save all rows
def process_item(self, item, spider):
print("Saving item into db ...")
self.cache_result(item) # cache this item
return item
def close_spider(self, spider):
self.save_all() # Saves remaining rows once spider closes
self.cnx.close()
def mysql_connect(self):
try:
return mysql.connector.connect(**self.conf)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
def save(self, row):
cursor = self.cnx.cursor()
cursor.execute("SELECT DISTINCT product_id FROM products;")
existing_ids = [row[0] for row in cursor.fetchall()]
create_query = ("INSERT INTO " + self.table +
"(rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
# data_user = (rowid, date, listing_id, product_id, product_name, price, url)
# Insert new row
cursor.execute(create_query, row)
# lastRecordId = cursor.lastrowid
# Make sure data is committed to the database
# self.cnx.commit()
# cursor.close()
print("Item saved")
product_id = row['product_id']
if not product_id in existing_ids:
create_query = ("INSERT INTO " + self.table2 +
"(product_rowid, date, listing_id, product_id, product_name, price, url) "
"VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
# data_user = (rowid, date, listing_id, product_id, product_name, price, url)
# new_cursor = self.cnx.cursor()
cursor.execute(create_query, row)
# lastRecordId = cursor.lastrowid
# self.cnx.commit()
# new_cursor.close()
print("New Item saved")
self.cnx.commit()
1条答案
按热度按时间x33g5p2x1#
您可以在初始化时执行保存方法的第一个查询,并将它的副本存储为示例变量,然后用save方法中的新条目更新它,从而消除save方法的第一个查询。另一个性能提升可能来自于使用mysql游标的
executemany
特性,即将所有行传递给save方法,而不是一次传递一行。我其实很好奇这会有多大的性能提升,所以请分享时间上的差异。