如何使用Pandas DataFrame对数据库表的现有行执行UPDATE?

t30tvxxf  于 2023-01-07  发布在  其他
关注(0)|答案(4)|浏览(368)

我尝试查询MySql数据库表的子集,将结果输入Pandas DataFrame,修改一些数据,然后将更新的行写回到同一个表中。(〈50,000),因此返回整个表并执行df.to_sql(tablename,engine, if_exists='replace')不是一个可行的选择。是否有一种直接的方法可以UPDATE已更改的行,而无需遍历DataFrame中的每一行?
我知道这个项目,它试图模拟一个“upsert”工作流,但似乎它只完成了插入新的非重复行的任务,而不是更新现有行的一部分:
GitHub Pandas-to_sql-upsert
以下是我试图在更大范围内实现的目标的框架:

import pandas as pd
from sqlalchemy import create_engine
import threading

#Get sample data
d = {'A' : [1, 2, 3, 4], 'B' : [4, 3, 2, 1]}
df = pd.DataFrame(d)

engine = create_engine(SQLALCHEMY_DATABASE_URI)

#Create a table with a unique constraint on A.
engine.execute("""DROP TABLE IF EXISTS test_upsert """)
engine.execute("""CREATE TABLE test_upsert (
                  A INTEGER,
                  B INTEGER,
                  PRIMARY KEY (A)) 
                  """)

#Insert data using pandas.to_sql
df.to_sql('test_upsert', engine, if_exists='append', index=False)

#Alter row where 'A' == 2
df_in_db.loc[df_in_db['A'] == 2, 'B'] = 6

现在,我想将df_in_db写回到'test_upsert'表中,并反映更新后的数据。
这个SO问题非常相似,其中一条评论建议使用一个“sqlalchemy表类”来执行任务。
Update table using sqlalchemy table class
如果这是实现它的最佳(唯一?)方式,有人能扩展一下我将如何为我上面的特定情况实现它吗?

2ic8powd

2ic8powd1#

我认为最简单的方法是:
首先删除那些将要被“upserted”的行,这可以在一个循环中完成,但是对于更大的数据集(5K+行)来说效率不是很高,所以我将DF的这个切片保存到一个临时的MySQL表中:

# assuming we have already changed values in the rows and saved those changed rows in a separate DF: `x`
x = df[mask]  # `mask` should help us to find changed rows...

# make sure `x` DF has a Primary Key column as index
x = x.set_index('a')

# dump a slice with changed rows to temporary MySQL table
x.to_sql('my_tmp', engine, if_exists='replace', index=True)

conn = engine.connect()
trans = conn.begin()

try:
    # delete those rows that we are going to "upsert"
    engine.execute('delete from test_upsert where a in (select a from my_tmp)')
    trans.commit()

    # insert changed rows
    x.to_sql('test_upsert', engine, if_exists='append', index=True)
except:
    trans.rollback()
    raise

PS我没有测试这个代码,所以它可能有一些小错误,但它应该给予你一个想法...

oxalkeyp

oxalkeyp2#

使用Panda's to_sql“方法”参数和sqlalchemy的mysql insert on_duplicate_key_update特性的MySQL特定解决方案:

def create_method(meta):
    def method(table, conn, keys, data_iter):
        sql_table = db.Table(table.name, meta, autoload=True)
        insert_stmt = db.dialects.mysql.insert(sql_table).values([dict(zip(keys, data)) for data in data_iter])
        upsert_stmt = insert_stmt.on_duplicate_key_update({x.name: x for x in insert_stmt.inserted})
        conn.execute(upsert_stmt)

    return method

engine = db.create_engine(...)
conn = engine.connect()
with conn.begin():
    meta = db.MetaData(conn)
    method = create_method(meta)
    df.to_sql(table_name, conn, if_exists='append', method=method)
wgx48brx

wgx48brx3#

我之前一直在挣扎现在我找到了办法。
基本上就是创建一个单独的数据框,在其中保存只需更新的数据。

df #updating data in dataframe

s_update = "" #String of updations

# Loop through the data frame

for i in range(len(df)):
    s_update += "update your_table_name set column_name = '%s' where column_name = '%s';"%(df[col_name1][i], df[col_name2][i])

现在将s_update传递给cursor.executeengine.execute(无论您在何处执行SQL查询)
这将立即更新您的数据。

tf7tbtn2

tf7tbtn24#

下面是一个通用函数,它将更新每一行(但同时更新行中的所有值)

def update_table_from_df(df, table, where):
    '''Will take a dataframe and update each specified row in the SQL table
        with the DF values -- DF columns MUST match SQL columns
        WHERE statement should be triple-quoted string
        Will not update any columns contained in the WHERE statement'''
    update_string = f'UPDATE {table} set '
    for idx, row in df.iterrows():
        upstr = update_string
        for col in list(df.columns):
            if (col != 'datetime') & (col not in where):
                if col != df.columns[-1]:
                    if type(row[col] == str):
                        upstr += f'''{col} = '{row[col]}', '''
                    else:
                        upstr += f'''{col} = {row[col]}, '''
                else:
                    if type(row[col] == str):
                        upstr += f'''{col} = '{row[col]}' '''
                    else:
                        upstr += f'''{col} = {row[col]} '''
        upstr += where
        cursor.execute(upstr)
        cursor.commit()```

相关问题