将pandas Dataframe 写入AWS athena数据库

hwazgwia  于 2023-05-12  发布在  其他
关注(0)|答案(4)|浏览(98)

我使用pyathena运行了一个查询,并创建了一个pandas数据框。有没有一种方法可以直接将pandas Dataframe 写入AWS athena数据库?例如data.to_sql用于MYSQL数据库。
下面分享一个dataframe代码的例子,以供参考需要写入AWS athena数据库:

data=pd.DataFrame({'id':[1,2,3,4,5,6],'name':['a','b','c','d','e','f'],'score':[11,22,33,44,55,66]})
9lowa7mx

9lowa7mx1#

实现这一目标的另一种现代方法(截至2020年2月)是使用aws-data-wrangler库。它正在编写数据处理中的许多常规(有时令人讨厌)任务。
结合问题的情况,代码如下所示:

import pandas as pd
import awswrangler as wr

data=pd.DataFrame({'id':[1,2,3,4,5,6],'name':['a','b','c','d','e','f'],'score':[11,22,33,44,55,66]})

# Typical Pandas, Numpy or Pyarrow transformation HERE!

wr.pandas.to_parquet(  # Storing the data and metadata to Data Lake
    dataframe=data,
    database="database",
    path="s3://your-s3-bucket/path/to/new/table",
    partition_cols=["name"],
)

这是非常有用的,因为aws-data-wrangler知道从路径中解析表名(但您可以在参数中提供表名),并根据dataframe在Glue catalog中定义适当的类型。
它还有助于使用Athena直接查询pandas dataframe的数据:

df = wr.pandas.read_table(database="dataase", table="table")

所有的过程将是快速和方便。

crcmnpdw

crcmnpdw2#

AWS Athena的存储空间为S3。它只从S3文件中读取数据。以前**不可能像其他数据库一样将数据直接写入Athena数据库。
It was missing support supportinsert into ...
作为workaround,用户可以执行以下步骤来使其工作。

1. You need to write the pandas output to a file, 
2. Save the file to S3 location, from where the AWS Athena is reading.

我希望它能给你一些提示。

更新于2020年1月5日

2019年9月19日,AWS宣布支持插入到Athena,在上面的回答incorrect中做了一个声明,虽然我提供的上述解决方案仍然可以工作,但是随着AWS的宣布,增加了另一个可能的解决方案。
正如AWS Documentation所建议的那样,此功能将允许您发送insert语句,Athena将数据写回source table S3 location中的新文件。因此,从本质上讲,AWS已经解决了将数据写入S3文件的难题。
仅需注意,Athena将插入的数据写入单独的文件。这就是documentation

polhcujo

polhcujo3#

在撰写本文时,最常见的答案是使用旧版本的API,该版本已不再工作。
The documentation现在概述了此往返过程。

import awswrangler as wr
import pandas as pd

df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})

# Storing data on Data Lake
wr.s3.to_parquet(
    df=df,
    path="s3://bucket/dataset/",
    dataset=True,
    database="my_db",
    table="my_table"
)

# Retrieving the data directly from Amazon S3
df = wr.s3.read_parquet("s3://bucket/dataset/", dataset=True)

# Retrieving the data from Amazon Athena
df = wr.athena.read_sql_query("SELECT * FROM my_table", database="my_db")
kninwzqo

kninwzqo4#

一个选项是用途:

pandas_df.to_parquet(file, engine="pyarrow)

首先将其保存到Parquet格式的时态文件中。为此,您需要安装pyarrow依赖项。在本地保存此文件后,可以使用aws sdk for python将其推送到S3。
现在可以通过执行以下查询在Athena中创建新表:

CREATE EXTERNAL TABLE IF NOT EXISTS 'your_new_table'
        (col1 type1, col2 type2)
    PARTITIONED BY (col_partitions_if_neccesary)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    LOCATION 's3 location of your parquet file'
    tblproperties ("parquet.compression"="snappy");

另一个选择是使用pyathena。从他们的官方文档中举个例子:

import pandas as pd
from urllib.parse import quote_plus
from sqlalchemy import create_engine

conn_str = "awsathena+rest://:@athena.{region_name}.amazonaws.com:443/"\
           "{schema_name}?s3_staging_dir={s3_staging_dir}&s3_dir={s3_dir}&compression=snappy"

engine = create_engine(conn_str.format(
    region_name="us-west-2",
    schema_name="YOUR_SCHEMA",
    s3_staging_dir=quote_plus("s3://YOUR_S3_BUCKET/path/to/"),
    s3_dir=quote_plus("s3://YOUR_S3_BUCKET/path/to/")))

df = pd.DataFrame({"a": [1, 2, 3, 4, 5]})
df.to_sql("YOUR_TABLE", engine, schema="YOUR_SCHEMA", index=False, if_exists="replace", method="multi")

在这种情况下,需要依赖关系sqlalchemy。

相关问题