postgresql 使用psycopg2批量更新Postgres DB中的行

yhived7q  于 2023-01-05  发布在  PostgreSQL
关注(0)|答案(2)|浏览(276)

我们需要对Postgres DB中的许多行进行批量更新,并希望使用下面的SQL语法。

UPDATE table_to_be_updated
SET msg = update_payload.msg
FROM (VALUES %(update_payload)s) AS update_payload(id, msg)
WHERE table_to_be_updated.id = update_payload.id
RETURNING *

尝试1 -传递值

我们需要向psycopg 2查询传递一个嵌套的可迭代格式,对于update_payload,我尝试过传递列表的列表、元组的列表和元组的元组,但都失败了,出现了各种错误。

尝试2 -使用__conform__编写自定义类

我试着编写一个自定义类来执行这些操作,它将返回

(VALUES (row1_col1, row1_col2), (row2_col1, row2_col2), (...))

我按照这里的说明编写了这样的代码,但很明显我做错了什么,例如,在这种方法中,我必须处理表中所有值的引用,这会很麻烦,而且容易出错。

class ValuesTable(list):
    def __init__(self, *args, **kwargs):
        super(ValuesTable, self).__init__(*args, **kwargs)

    def __repr__(self):
        data_in_sql = ""
        for row in self:
            str_values = ", ".join([str(value) for value in row])
            data_in_sql += "({})".format(str_values)
        return "(VALUES {})".format(data_in_sql)

    def __conform__(self, proto):
        return self.__repr__()

    def getquoted(self):
        return self.__repr__()

    def __str__(self):
        return self.__repr__()

**编辑:**如果使用另一种语法可以更快/更干净地完成批量更新,而不是我最初的问题,那么我洗耳恭听!

mwyxok5s

mwyxok5s1#

要求:

  • Postgres表,由字段id和msg(以及可能的其他字段)组成
  • 包含msg新值的Python数据
  • 应通过psycopg2更新Postgres表
    • 示例表**
CREATE TABLE einstein(
   id CHAR(5) PRIMARY KEY,
   msg VARCHAR(1024) NOT NULL
);
    • 试验数据**
INSERT INTO einstein VALUES ('a', 'empty');
INSERT INTO einstein VALUES ('b', 'empty');
INSERT INTO einstein VALUES ('c', 'empty');
    • Python程序**

假设的,自包含的例子程序与报价的一个著名的物理学家。

import sys
import psycopg2
from psycopg2.extras import execute_values

def print_table(con):
    cur = con.cursor()
    cur.execute("SELECT * FROM einstein")
    rows = cur.fetchall()
    for row in rows:
        print(f"{row[0]} {row[1]}")

def update(con, einstein_quotes):
    cur = con.cursor()
    execute_values(cur, """UPDATE einstein 
                           SET msg = update_payload.msg 
                           FROM (VALUES %s) AS update_payload (id, msg) 
                           WHERE einstein.id = update_payload.id""", einstein_quotes)
    con.commit()

def main():
    con = None
    einstein_quotes = [("a", "Few are those who see with their own eyes and feel with their own hearts."),
                       ("b", "I have no special talent. I am only passionately curious."),
                       ("c", "Life is like riding a bicycle. To keep your balance you must keep moving.")]

    try:
        con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
        print_table(con)
        update(con, einstein_quotes)
        print("rows updated:")
        print_table(con)

    except psycopg2.DatabaseError as e:

        print(f'Error {e}')
        sys.exit(1)

    finally:

        if con:
            con.close()

if __name__ == '__main__':
    main()
    • 准备好的语句备选项**
import sys
import psycopg2
from psycopg2.extras import execute_batch

def print_table(con):
    cur = con.cursor()
    cur.execute("SELECT * FROM einstein")
    rows = cur.fetchall()
    for row in rows:
        print(f"{row[0]} {row[1]}")

def update(con, einstein_quotes, page_size):
    cur = con.cursor()
    cur.execute("PREPARE updateStmt AS UPDATE einstein SET msg=$1 WHERE id=$2")
    execute_batch(cur, "EXECUTE updateStmt (%(msg)s, %(id)s)", einstein_quotes, page_size=page_size)
    cur.execute("DEALLOCATE updateStmt")
    con.commit()

def main():
    con = None
    einstein_quotes = ({"id": "a", "msg": "Few are those who see with their own eyes and feel with their own hearts."},
                       {"id": "b", "msg": "I have no special talent. I am only passionately curious."},
                       {"id": "c", "msg": "Life is like riding a bicycle. To keep your balance you must keep moving."})

    try:
        con = psycopg2.connect("dbname='stephan' user='stephan' host='localhost' password=''")
        print_table(con)
        update(con, einstein_quotes, 100)  #choose some meaningful page_size here
        print("rows updated:")
        print_table(con)

    except psycopg2.DatabaseError as e:

        print(f'Error {e}')
        sys.exit(1)

    finally:

        if con:
            con.close()

if __name__ == '__main__':
    main()
    • 产出**

上述程序将向调试控制台输出以下内容:

a     empty
b     empty
c     empty
rows updated:
a     Few are those who see with their own eyes and feel with their own hearts.
b     I have no special talent. I am only passionately curious.
c     Life is like riding a bicycle. To keep your balance you must keep moving.
bbuxkriu

bbuxkriu2#

简短答案!使用execute_values(curs, sql, args),请参阅文档

对于那些寻找简单明了的答案。示例代码批量更新用户;

from psycopg2.extras import execute_values

sql = """
    update users u
    set
        name = t.name,
        phone_number = t.phone_number
    from (values %s) as t(id, name, phone_number)
    where u.id = t.id;
"""

rows_to_update = [
    (2, "New name 1", '+923002954332'),
    (5, "New name 2", '+923002954332'),
]
curs = conn.cursor()  # Assuming you already got the connection object
execute_values(curs, sql, rows_to_update)

如果你使用uuid作为主键,并且还没有在psycopg 2中注册uuid数据类型(将uuid保存为python字符串),你可以使用u.id = t.id::uuid这个条件。

相关问题