SQL优化以使用Scrapy增加批插入

qzlgjiam  于 2022-11-09  发布在  其他
关注(0)|答案(1)|浏览(108)

在我的上一篇文章中,我问过如何使用Scrapy来批量记录物品。
Buffered items and bulk insert to Mysql using scrapy
在@ Alexandria 的帮助下,我可以在缓存中保存1000个项目。但是,我这里的问题是,该高速缓存中的项目在被传输到mysql的过程中被一个一个地记录下来。我这里唯一的问题是速度。我认为这个问题是由于SQL代码优化不够造成的。
保存在SQL中的逻辑如下所示;
将这些项添加到products表中,如果product_id不存在,则将其添加到new_products表中。(我正在后台运行一个脚本,将这些行从旧行删除到新行。我在这里没有问题。换句话说,总共最多记录50 k行。)
可能是mysql在插入到new_products表的过程中速度变慢了。因为它会检查product_id是否存在于现有行中。
如果你能给我一个方法,让我一次在数据库中保存1000个项目,我会非常高兴。
我pipeline.py正在使用的www.example.com:

from __future__ import print_function
import logging
from scrapy import signals
from itemadapter import ItemAdapter
from mysql.connector import errorcode
from amazon_scraper.items import AmazonMobileDetailsItem
import mysql.connector

class AmazonScraperPipeline:
    table = 'products'
    table2 = 'new_products'
    conf = {
        'host': 'localhost',
        'user': 'xxxxxx',
        'password': 'xxxxxx',
        'database': 'xxxxxxx',
        'raise_on_warnings': True
    }

    def __init__(self,**kwargs):
        self._rows = []   #  store rows temporarily
        self._cached_rows = 0    # number of cached rows
        self._cache_limit = 1000   # limit before saving to database
        self.cnx = self.mysql_connect()

    def open_spider(self, spider):
        print("spider open")

    def save_all(self):    # calls self.save method for all cached rows
        if len(self._rows) > 0:
            list(map(self.save, self._rows))
            self._cached_rows = 0   # reset the count
            self._rows = []         # reset the cache

    def cache_result(self, item):  # adds new row to cache
        self._rows.append(dict(item))
        self._cached_rows += 1
        if self._cached_rows >= self._cache_limit: # checks if limit reached
            self.save_all()      # if it has been reached then save all rows

    def process_item(self, item, spider):
        print("Saving item into db ...")
        self.cache_result(item)    # cache this item
        return item

    def close_spider(self, spider):
        self.save_all()      # Saves remaining rows once spider closes
        self.cnx.close()

    def mysql_connect(self):
        try:
            return mysql.connector.connect(**self.conf)
        except mysql.connector.Error as err:
            if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
                print("Something is wrong with your user name or password")
            elif err.errno == errorcode.ER_BAD_DB_ERROR:
                print("Database does not exist")
            else:
                print(err)

    def save(self, row):
        cursor = self.cnx.cursor()
        cursor.execute("SELECT DISTINCT product_id FROM products;")
        existing_ids = [row[0] for row in cursor.fetchall()]
        create_query = ("INSERT INTO " + self.table +
            "(rowid, date, listing_id, product_id, product_name, price, url) "
            "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
        # data_user = (rowid, date, listing_id, product_id, product_name, price, url)
        # Insert new row
        cursor.execute(create_query, row)
        # lastRecordId = cursor.lastrowid

        # Make sure data is committed to the database
        # self.cnx.commit()
        # cursor.close()
        print("Item saved")

        product_id = row['product_id']
        if not product_id in existing_ids:
            create_query = ("INSERT INTO " + self.table2 +
                "(product_rowid, date, listing_id, product_id, product_name, price, url) "
                "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
            # data_user = (rowid, date, listing_id, product_id, product_name, price, url)

            # new_cursor = self.cnx.cursor()
            cursor.execute(create_query, row)
            # lastRecordId = cursor.lastrowid
            # self.cnx.commit()
            # new_cursor.close()
            print("New Item saved")
        self.cnx.commit()
x33g5p2x

x33g5p2x1#

您可以在初始化时执行保存方法的第一个查询,并将它的副本存储为示例变量,然后用save方法中的新条目更新它,从而消除save方法的第一个查询。另一个性能提升可能来自于使用mysql游标的executemany特性,即将所有行传递给save方法,而不是一次传递一行。

class Pipeline:
    table = 'products'
    table2 = 'new_products'
    conf = {
        'host': 'localhost',
        'user': 'xxxxxx',
        'password': 'xxxxxx',
        'database': 'xxxxxxx',
        'raise_on_warnings': True
    }

    def __init__(self,**kwargs):
        self._rows = []   #  store rows temporarily
        self._unique_products = [] # unique product rows
        self._cached_rows = 0    # number of cached rows
        self._cache_limit = 1000   # limit before saving to database
        self.cnx = self.mysql_connect()
        self.existing_ids = self.get_product_ids()

    def open_spider(self, spider):
        print("spider open")

    def save_all(self):    # calls self.save method for all cached rows
        if len(self._rows) > 0:
            self.save(self._rows, self._unique_products)
            self._cached_rows = 0   # reset the count
            self._rows = []         # reset the cache
            self._unique_products = []

    def process_item(self, item, spider):
        row = dict(item)
        product_id = row['product_id']
        if product_id not in self.existing_ids:
            self._unique_products.append(row)
            self.existing_ids.add(product_id)
        self._rows.append(row)
        self._cached_rows += 1
        if self._cached_rows >= self._cache_limit: # checks if limit reached
            self.save_all()      # if it has been reached then save all rows
        return item

    def close_spider(self, spider):
        self.save_all()      # Saves remaining rows once spider closes
        self.cnx.close()

    def mysql_connect(self):
        try:
            return mysql.connector.connect(**self.conf)
        except mysql.connector.Error as err:
            if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
                print("Something is wrong with your user name or password")
            elif err.errno == errorcode.ER_BAD_DB_ERROR:
                print("Database does not exist")
            else:
                print(err)

    def get_product_ids(self):
        cursor = self.cnx.cursor()
        cursor.execute("SELECT DISTINCT product_id FROM products;")
        return set([row[0] for row in cursor.fetchall()])

    def save(self, rows, products):
        cursor = self.cnx.cursor()
        create_query = ("INSERT INTO " + self.table +
            "(rowid, date, listing_id, product_id, product_name, price, url) "
            "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
        # Insert new row
        cursor.executemany(create_query, rows)
        # Make sure data is committed to the database
        self.cnx.commit()
        cursor.close()
        print("Item saved with ID: {}" . format(cursor.lastrowid))

        create_query = ("INSERT INTO " + self.table2 +
            "(product_rowid, date, listing_id, product_id, product_name, price, url) "
            "VALUES (%(rowid)s, %(date)s, %(listing_id)s, %(product_id)s, %(product_name)s, %(price)s, %(url)s)")
        new_cursor = self.cnx.cursor()
        new_cursor.executemany(create_query, products)
        self.cnx.commit()
        new_cursor.close()
        print("New Item saved with ID: {}" . format(new_cursor.lastrowid))

我其实很好奇这会有多大的性能提升,所以请分享时间上的差异。

相关问题