pg8000从CSV复制

mccptt67  于 2023-07-31  发布在  其他
关注(0)|答案(3)|浏览(78)

我在一个AppEngineflask应用程序上使用pg8000,这样我就可以处理CSV文件并将其插入到PSQL示例(托管在AZURE上)中。
为什么使用pg8000而不是psycopg2?->因为应用引擎不支持psycopg 2。
到目前为止,pg8000的文档还没有像psycopg 2那样声明一个函数来实现这一点。我还没有在SO或任何其他地方找到实现这一点的示例,包括文档。
有人知道这是否可能吗?

ibps3vxo

ibps3vxo1#

看看the source code,似乎没有直接导入CSV的方法,代码也没有任何内置的INSERT查询 Package 器,因此可以
您可以选择手动使用CSV阅读器和使用executemany

import csv
import pg8000

conn = pg8000.connect(user="postgres", password="C.P.Snow")
cursor = conn.cursor()

command = 'INSERT INTO book (title) VALUES (%s), (%s) RETURNING id, title'
with open('my-data.csv', 'rb') as fl:
    data = list(csv.reader(fl))
    conn.executemany(command, data)

字符串
需要注意的是,根据数据的大小,使用islice可能更好:

with open('my-data.csv', 'rb') as fl:
    reader = csv.reader(fl)
    slice = itertool.islice(reader, 100)
    while slice:
        conn.executemany(command, slice)
        slice = itertool.islice(reader, 100)

nwsw7zdq

nwsw7zdq2#

正如在另一个问题here中所建议的,您可以在对csv文件应用逻辑和使用csv read方法之前使用next方法。
很抱歉没有插入作为对前一个答案的补充,但我没有足够的分数来这样做。
我遇到了同样的问题,我用下面的方法解决了这个问题。请注意,对我来说,执行many的正确方法是在cursor对象上,而不是在conn上。

conn = pg8000.connect(user='username', password='password', host='host', port=5432, database='database name')
cursor = conn.cursor()

command = "INSERT INTO public.salesforce_accounts (field1, field2, field3, field4, field5, field6) VALUES (%s, %s, %s, %s, %s, %s)"
with open('test.csv', 'r') as file:
    next(file)
    data = list(csv.reader(file))
    cursor.executemany(command, data)

字符串

nwwlzxa7

nwwlzxa73#

所有这些方法对我来说都太慢了。在处理大数据时,COPY命令似乎工作得很好。这在几秒钟内运行,而其他方法则需要10分钟。

# Import pandas and sqlalchemy libraries
import pandas as pd
import sqlalchemy

# Create a database connection pool
pool = sqlalchemy.create_engine() # Fill with login/pg8000 connector

# Read data into dataframe from a csv file
df = pd.read_csv('data.csv')

# Alternatively, create a dataframe from a dictionary of data
# df = pd.Dataframe(data)

# Create an empty buffer to store the csv data
buffer = StringIO()

# Write the dataframe to the buffer without the index column
df.to_csv(buffer, index=False)

# Open a raw connection to the database
connPG8K = pool.raw_connection()

# Create a cursor object to execute queries
cursor = connPG8K.cursor()

# Reset the buffer position to the beginning
buffer.seek(0)

# Copy the data from the buffer to the database table using the csv format and header option. 
# Table must exist. Will overwrite that table.
cursor.execute('COPY "OverwriteTable" FROM STDIN WITH (FORMAT csv, HEADER);', stream=buffer)

# Commit the changes to the database
connPG8K.commit()

字符串

相关问题