sqlite 使用peewee对数百万行的批量更新进行批处理

x6492ojm  于 2023-02-16  发布在  SQLite
关注(0)|答案(1)|浏览(369)
  • 我有一个SQLite3数据库,其中的表有两千万行。
  • 我想更新表中某些列的值(针对所有行)。
  • 我遇到了性能问题(大约每秒只处理1,000行)。
  • 我想继续使用python中的peewee模块与数据库交互。

所以我不确定我的代码是否采用了正确的方法。在尝试了一些想法后,都失败了。我尝试批量执行更新。这里我的第一个解决方案是使用islice在游标上迭代,如下所示:

import math, itertools
from tqdm import tqdm
from cool_project.database import db, MyTable

def update_row(row):
    row.column_a = computation(row.column_d)
    row.column_b = computation(row.column_d)
    row.column_c = computation(row.column_d)

fields = (MyTable.column_a
          MyTable.column_b
          MyTable.column_c)

rows = MyTable.select()
total_rows = rows.count()
page_size = 1000
total_pages = math.ceil(total_rows / page_size)
# Start #
with db.atomic():
    for page_num in tqdm(range(total_pages)):
        page = list(itertools.islice(rows, page_size))
        for row in page: update_row(row)
        MyTable.bulk_update(page, fields=fields)

这失败了,因为它试图将整个查询的结果放入内存,所以我修改了代码以使用paginate函数。

import math
from tqdm import tqdm
from cool_project.database import db, MyTable

def update_row(row):
    row.column_a = computation(row.column_d)
    row.column_b = computation(row.column_d)
    row.column_c = computation(row.column_d)

fields = (MyTable.column_a
          MyTable.column_b
          MyTable.column_c)

rows = MyTable.select()
total_rows = rows.count()
page_size = 1000
total_pages = math.ceil(total_rows / page_size)
# Start #
with db.atomic():
    for page_num in tqdm(range(1, total_pages+1)):
        # Get a batch #
        page = MyTable.select().paginate(page_num, page_size)
        # Update #
        for row in page: update_row(row)
        # Commit #
        MyTable.bulk_update(page, fields=fields)

但它仍然是相当缓慢的,需要〉24小时才能完成。
奇怪的是,速度(以每秒的行数表示)随着时间的推移明显下降,脚本开始时大约为每秒1000行,但半小时后下降到每秒250行。
我错过什么了吗?谢谢!

jgzswidk

jgzswidk1#

问题是双重的--您将所有结果拉入内存,并且使用批量更新API,这非常复杂/特殊,而且对于SQLite来说也完全没有必要。
请尝试以下操作:

def update_row(row):
    row.column_a = computation(row.column_d)
    row.column_b = computation(row.column_d)
    row.column_c = computation(row.column_d)

fields = (MyTable.column_a
          MyTable.column_b
          MyTable.column_c)

rows = MyTable.select()
with db.atomic():
    for row in rows.iterator():  # Add ".iterator()" to avoid caching rows
        update_row(row)
        row.save(only=fields)

相关问题