postgresql 向大型Postgres数据库的所有行添加列

ibps3vxo  于 2023-06-22  发布在  PostgreSQL
关注(0)|答案(1)|浏览(196)

我有一个PostgreSQL数据库,我需要根据Python中执行的一些计算来更新特定的列。这是运行非常慢,似乎慢下来的时间越长的代码运行(它通过了约3%的行在12个小时)。
我不认为我是基于Windows任务管理器的资源受限。我有RAM和CPU可用。磁盘活动率为100%,但执行速度低于NVME读/写速度。
该数据库有大约16亿行。id是主键。我使用psycopg 2与它交互,如下所示:

import psycopg2

def retrieve_raw_content_batch(batch_size):
    with db_connect() as conn:
        with conn.cursor('testit') as cursor:
            cursor.execute("SELECT id, columnoftext FROM table;")
            while True:
                rows = cursor.fetchmany(batch_size)
                if not rows:
                    break

                yield rows

def update_database(upload_list):
    with db_connect() as conn:
        with conn.cursor() as cursor:
            update_query = "UPDATE table SET col1 = %s, col2 = %s WHERE id = %s"
            psycopg2.extras.execute_batch(cursor, update_query, upload_list)

def do_stuff(row_batch): 
    for rows in row_batch:
        upload_list = []
        for row in rows:
            #calculate to get id, col1, col2
            upload_list.append((id, col1, col2))
        update_database(upload_list)

def main(batch_size):
    rows_batch = retrieve_raw_content_batch(batch_size)
    do_stuff(rows_batch)

我试图通过将max_wal_size增加到10 GB来修改postgresql.conf文件,但我对Postgres相对较陌生。我不知道如何优化我的数据库配置,或者这是否是问题所在。
我还想知道用COPY创建一个新表并在后面使用JOIN,而不是每行单独使用UPDATE是否更有意义。

vh0rcniy

vh0rcniy1#

它花了这么长时间的原因是你对每一行都使用了UPDATE,这意味着你有16亿个事务-这将永远花不完。
如果将UPDATE拆分为多个批,则可以大大减少事务的数量。postgres中最安全和最有效的方法是创建一个临时表来批量更新和传输。解决方案相对简单,只要它适合您的使用情况:

def update_database_in_batches(upload_list, batch_size):
    with db_connect() as conn:
        with conn.cursor() as cursor:
            for i in range(0, len(upload_list), batch_size):
                temp_table_query = """
                CREATE TEMP TABLE temp_table (id INTEGER, col1 INTEGER, col2 INTEGER)
                ON COMMIT DROP;
                """
                cursor.execute(temp_table_query)

                # Batch insert to the temp_table
                insert_query = "INSERT INTO temp_table (id, col1, col2) VALUES %s"
                psycopg2.extras.execute_values(cursor, insert_query, upload_list[i:i+batch_size])

                # Update the main table from the temporary table
                update_query = """
                UPDATE table
                SET col1 = temp_table.col1, col2 = temp_table.col2
                FROM temp_table
                WHERE table.id = temp_table.id;
                """
                cursor.execute(update_query)

您必须指定batch_size等,可能还需要进行一些小的修改,但这是通用结构

相关问题