在SQLite数据库中插入数百万行时,Python速度太慢

0ve6wy6x  于 2022-11-15  发布在  SQLite
关注(0)|答案(1)|浏览(195)

对于我的国际象棋引擎,我使用统计数据来选择最优的走法。我从数以百万计的游戏中收集了它们。我感兴趣的是当前动作下一步动作以及在给定当前动作的情况下,下一步动作被播放了多少次。
对于使用Python词典并将其与Pickle一起存储的文件来说,文件太大,很难用新游戏进行更新。所以我决定使用SQLite。
我创建了一个类MovesDatabase

class MovesDatabase:

def __init__(self, work_dir):
    self.con = sqlite3.connect(os.path.join(work_dir, "moves.db"))
    self.con.execute('PRAGMA temp_store = MEMORY')
    self.con.execute('PRAGMA synchronous = NORMAL')
    self.con.execute('PRAGMA journal_mode = WAL')
    self.cur = self.con.cursor()

    self.cur.execute("CREATE TABLE IF NOT EXISTS moves("
                     "move TEXT,"
                     "next TEXT,"
                     "count INTEGER DEFAULT 1);")

movenext以字符串格式表示棋盘的状态:fen。示例:

  • rnbqkbnr/pppppp/8/8/8/PPPPPPP/RNBQKBNR
  • r1b1k1nr/p2p1pNp/n2B4/1p1NP2P/6P1/3P1Q2/P1P1K3/q5b1
  • 8/8/8/4p1K1/2k1P3/8/8/8 b

下面的方法负责获取游戏文件,如果这对(movenext)是新的,则提取动作并插入,或者如果(movenext)已经存在于数据库中,则更新:

def insert_moves_from_file(self, file: str):
    print("Extracting moves to database from " + file)

    count = 0

    with open(file) as games_file:
        game = chess.pgn.read_game(games_file)

        while game is not None:
            batch = []
            board = game.board()
            state_one = board.fen().split(' ')[0] + ' ' + board.fen().split(' ')[1]

            for move in game.mainline_moves():
                board.push(move)
                fen = board.fen().split(' ')
                state_two = fen[0] + ' ' + fen[1]

                res = self.cur.execute("SELECT * FROM moves WHERE move=? AND next=?",
                                       (state_one, state_two))
                res = res.fetchall()

                if len(res) != 0:
                    self.cur.execute("UPDATE moves SET count=count+1 WHERE move=? AND next=?",
                                     (state_one, state_two))
                else:
                    batch.append((state_one, state_two))

                state_one = state_two

            self.cur.executemany("INSERT INTO moves(move, next) VALUES"
                                 "(?, ?)", batch)
            count += 1
            print('\r' "%d games was add to the database.." % (count + 1), end='')
            game = chess.pgn.read_game(games_file)

    self.con.commit()
    print("\n Finished!")

该对(movenext)是唯一的。
我测试了一个包含大约400万个(movenext)的文件。它开始插入/更新3.000行/秒,但在50K行的情况下,它会减慢到100行/秒,并继续下降。我设计这种方法是为了处理多个游戏文件,这就是我首先选择SQL数据库的原因。

jmp7cifd

jmp7cifd1#

这里慢的不是INSERT
您的movenext列没有索引,因此涉及这些列的任何SELECTUPDATE都需要全表扫描。
如果(move, next)始终是唯一的,则需要在其上添加一个UNIQUE索引。它还将自动加快查询move/next对的速度(但不一定只查询这两列中的一列)。
要在现有表上创建该索引,

CREATE UNIQUE INDEX ix_move_next ON moves (move, next);

最后,一旦建立了索引,您也可以使用upsert删除整个SELECT/UPDATE代码:

INSERT INTO moves (move, next) VALUES (?, ?) ON CONFLICT (move, next) DO UPDATE SET count = count + 1;

这是一个轻微的重构,在我的机器上实现了大约6200次/秒的移动。(它需要tqdm库来提供漂亮的进度条,并需要一个包含pgn文件的pgns/目录。)

import glob
import sqlite3
import chess.pgn
import tqdm
from chess import WHITE

def board_to_state(board):
    # These were extracted from the implementation of `board.fen()`
    # so as to avoid doing extra work we don't need.
    bfen = board.board_fen(promoted=False)
    turn = ("w" if board.turn == WHITE else "b")
    return f'{bfen} {turn}'

def insert_game(cur, game):
    batch = []
    board = game.board()
    state_one = board_to_state(board)
    for move in game.mainline_moves():
        board.push(move)
        state_two = board_to_state(board)
        batch.append((state_one, state_two))
        state_one = state_two
    cur.executemany("INSERT INTO moves (move, next) VALUES (?, ?) ON CONFLICT (move, next) DO UPDATE SET count = count + 1", batch)
    n_moves = len(batch)
    return n_moves

def main():
    con = sqlite3.connect("moves.db")
    con.execute('PRAGMA temp_store = MEMORY')
    con.execute('PRAGMA synchronous = NORMAL')
    con.execute('PRAGMA journal_mode = WAL')
    con.execute('CREATE TABLE IF NOT EXISTS moves(move TEXT,next TEXT,count INTEGER DEFAULT 1);')
    con.execute('CREATE UNIQUE INDEX IF NOT EXISTS ix_move_next ON moves (move, next);')

    cur = con.cursor()

    for pgn_file in sorted(glob.glob("pgns/*.pgn")):
        with open(pgn_file) as games_file:
            n_games = 0
            with tqdm.tqdm(desc=pgn_file, unit="moves") as pbar:
                while (game := chess.pgn.read_game(games_file)):
                    n_moves = insert_game(cur, game)
                    n_games += 1
                    pbar.set_description(f"{pgn_file} ({n_games} games)", refresh=False)
                    pbar.update(n_moves)
            con.commit()

if __name__ == '__main__':
    main()

相关问题