postgresql 在psycopg2中构建动态更新查询

cwtwac6a  于 2023-01-25  发布在  PostgreSQL
关注(0)|答案(5)|浏览(191)

我必须为postgresql构造一个动态更新查询。它是动态的,因为事先我必须确定要更新哪些列。
给定一个示例表:

create table foo (id int, a int, b int, c int)

然后,我将以编程方式构造"set"子句

_set = {}
_set['a'] = 10
_set['c'] = NULL

之后我必须构建更新查询。我在这里卡住了。我必须构造这个sql更新命令:

update foo set a = 10, b = NULL where id = 1

如何使用psycopg2参数化命令来实现这一点?(即,如果dict不为空,则循环通过dict并构建set子句)?

    • 更新**

当我睡觉的时候,我自己找到了解决办法。它是动态的,正是我想要的:-)

create table foo (id integer, a integer, b integer, c varchar)

updates = {}
updates['a'] = 10
updates['b'] = None
updates['c'] = 'blah blah blah'
sql = "upgrade foo set %s where id = %s" % (', '.join("%s = %%s" % u for u in updates.keys()), 10)
params = updates.values()
print cur.mogrify(sql, params)
cur.execute(sql, params)

结果就是我所需要的内容和方式(特别是可空列和可引用列):

"upgrade foo set a = 10, c = 'blah blah blah', b = NULL where id = 10"
t9aqgxwy

t9aqgxwy1#

实际上有一种稍微干净一点的方法,使用the alternative column-list syntax

sql_template = "UPDATE foo SET ({}) = %s WHERE id = {}"
sql = sql_template.format(', '.join(updates.keys()), 10)
params = (tuple(addr_dict.values()),)
print cur.mogrify(sql, params)
cur.execute(sql, params)
pbossiut

pbossiut2#

使用psycopg2.sql- SQL字符串合成模块
该模块包含的对象和函数可用于以方便和安全的方式动态生成SQL。

from psycopg2 import connect, sql

conn = connect("dbname=test user=postgres")

upd = {'name': 'Peter', 'age': 35, 'city': 'London'}
ref_id = 12

sql_query = sql.SQL("UPDATE people SET {data} WHERE id = {id}").format(
    data=sql.SQL(', ').join(
        sql.Composed([sql.Identifier(k), sql.SQL(" = "), sql.Placeholder(k)]) for k in upd.keys()
    ),
    id=sql.Placeholder('id')
)
upd.update(id=ref_id)
with conn:
    with conn.cursor() as cur:
        cur.execute(sql_query, upd)
conn.close()

在关闭连接之前运行print(sql_query.as_string(conn))将显示以下输出:

UPDATE people SET "name" = %(name)s, "age" = %(age)s, "city" = %(city)s WHERE id = %(id)s
5kgi1eie

5kgi1eie3#

不需要动态SQL,假设a不可空,b可空。
如果要同时更新ab

_set = dict(
    id = 1,
    a = 10,
    b = 20, b_update = 1
)
update = """
    update foo
    set
        a = coalesce(%(a)s, a), -- a is not nullable
        b = (array[b, %(b)s])[%(b_update)s + 1] -- b is nullable
    where id = %(id)s
"""
print cur.mogrify(update, _set)
cur.execute(update, _set)

输出:

update foo
set
    a = coalesce(10, a), -- a is not nullable
    b = (array[b, 20])[1 + 1] -- b is nullable
where id = 1

如果不想更新任何内容:

_set = dict(
    id = 1,
    a = None,
    b = 20, b_update = 0
)

输出:

update foo
set
    a = coalesce(NULL, a), -- a is not nullable
    b = (array[b, 20])[0 + 1] -- b is nullable
where id = 1
dgiusagp

dgiusagp4#

一个不带python格式的选项,使用psycopg2的AsIs函数作为列名(尽管这并不妨碍在列名上进行SQL注入)。

update_statement = f'UPDATE foo SET (%s) = %s WHERE id_column=%s'
columns = data.keys()
values = [data[column] for column in columns]
query = cur.mogrify(update_statement, (AsIs(','.join(columns)), tuple(values), id_value))
sulc1iza

sulc1iza5#

这是我在一个泛型DatabaseHandler类中的解决方案,当使用pd.DataFrame作为源代码时,它提供了很大的灵活性。

def update_data(
        self,
        table: str,
        df: pd.DataFrame,
        indexes: Optional[list] = None,
        column_map: Optional[dict] = None,
        commit: Optional[bool] = False,
    ) -> int:
        """Update data in the media database

        Args:
            table (str): the "tablename" or "namespace.tablename"
            df (pandas.DataFrame): dataframe containing the data to update
            indexes (list): the list of columns in the table that will be in the WHERE clause of the update statement.
                If not provided, will use df indexes.
            column_map (dict): dictionary mapping the columns in df to the columns in the table
                columns in the column_map that are also in keys will not be updated
                Key = df column.
                Value = table column.
            commit (bool): if True, the transaction will be committed (default=False)

            Notes:
                If using a column_map, only the columns in the data_map will be updated or used as indexes.
                Order does not matter. If not using a column_map, all columns in df must exist in table.

        Returns:
            int : rows updated
        """
        try:
            if not indexes:
                # Use the dataframe index instead
                indexes = []
                for c in df.index.names:
                    if not c:
                        raise Exception(
                            f"Dataframe contains indexes without names. Unable to determine update where clause."
                        )
                    indexes.append(c)

            update_strings = []
            tdf = df.reset_index()
            if column_map:
                target_columns = [c for c in column_map.keys() if c not in indexes]
            else:
                column_map = {c: c for c in tdf.columns}
                target_columns = [c for c in df.columns if c not in indexes]

            for i, r in tdf.iterrows():
                upd_params = ", ".join(
                    [f"{column_map[c]} = %s" for c in target_columns]
                )
                upd_list = [r[c] if pd.notna(r[c]) else None for c in target_columns]
                upd_str = self._cur.mogrify(upd_params, upd_list).decode("utf-8")

                idx_params = " AND ".join([f"{column_map[c]} = %s" for c in indexes])
                idx_list = [r[c] if pd.notna(r[c]) else None for c in indexes]
                idx_str = self._cur.mogrify(idx_params, idx_list).decode("utf-8")

                update_strings.append(f"UPDATE {table} SET {upd_str} WHERE {idx_str};")
            full_update_string = "\n".join(update_strings)
            print(full_update_string)  # Debugging
            self._cur.execute(full_update_string)
            rowcount = self._cur.rowcount
            if commit:
                self.commit()
            return rowcount
        except Exception as e:
            self.rollback()
            raise e

示例用法:

>>> df = pd.DataFrame([
    {'a':1,'b':'asdf','c':datetime.datetime.now()}, 
    {'a':2,'b':'jklm','c':datetime.datetime.now()}
])

>>> cls.update_data('my_table', df, indexes = ['a'])
UPDATE my_table SET b = 'asdf', c = '2023-01-17T22:13:37.095245'::timestamp WHERE a = 1;
UPDATE my_table SET b = 'jklm', c = '2023-01-17T22:13:37.095250'::timestamp WHERE a = 2;

>>> cls.update_data('my_table', df, indexes = ['a','b'])
UPDATE my_table SET c = '2023-01-17T22:13:37.095245'::timestamp WHERE a = 1 AND b = 'asdf';
UPDATE my_table SET c = '2023-01-17T22:13:37.095250'::timestamp WHERE a = 2 AND b = 'jklm';

>>> cls.update_data('my_table', df.set_index('a'), column_map={'a':'db_a','b':'db_b','c':'db_c'} )
UPDATE my_table SET db_b = 'asdf', db_c = '2023-01-17T22:13:37.095245'::timestamp WHERE db_a = 1;
UPDATE my_table SET db_b = 'jklm', db_c = '2023-01-17T22:13:37.095250'::timestamp WHERE db_a = 2;

但是请注意,由于它生成where子句的方式,这对于SQL注入是不安全的。

相关问题