pandas 解决从 Dataframe 导出的S3上红移表和 parquet 文件之间的数据类型不匹配

inn6fuwd  于 2022-12-25  发布在  其他
关注(0)|答案(1)|浏览(146)

因此,我们每小时都有很多lambda在运行,它们从API查询数据,将json响应转换为 Dataframe ,并使用method='multi'插入 Dataframe 以进行批量插入。
但是所有这些lambda都给我们的单节点红移集群带来了很大的压力(是的,我们很小,现在还不能增加节点)。每当lambda将数据插入我们的集群时,CPU使用率就会达到100%,这会导致其他作业超时。
为了加快插入速度,我正在测试awswrangler的红移副本。
1.将json响应转换为 Dataframe
1.使用www.example.com _parquet()将 Dataframe 导出到parquetwr.s3.to
1.使用wr.redshift.copy()将parquet附加到红移表
使用awswrangler测试卸载和复制
1.使用wr.redshift.unload()从示例表中捕获了几行的 Dataframe
1.使用wr.redshift.copy()将 Dataframe 加载到类似的模式测试表中
复制失败,抛出Spectrum Scan Error: <s3path> has an incompatible Parquet schema for column

  • TIMESTAMP列转换为CHAR类型
  • INT2列转换为INT64

我希望导出的 Dataframe 具有与表或副本相似的架构,以解决数据类型不匹配的问题。
卸载调用:

test_df = wr.redshift.unload(
    sql=f"SELECT * FROM {schema}.{table_name};",
    path="s3://bucket-name/copy_test/",
    keep_files=False,
    con=rs_con
)

复制呼叫

wr.redshift.copy(
    df=test_df,
    table=table_name,
    schema=schema,
    keep_files=False,
    path='s3://bucket-name/copy_test/',
    use_column_names=True,
    index=False,
    con=rs_con
)
0mkxixxg

0mkxixxg1#

您可以确保Parquet文件的模式与目标Redshift表的模式匹配,方法是在使用wr.s3.to_parquet()写入Parquet文件时为列指定正确的数据类型,或者在调用wr.redshift.copy()时使用schema参数指定目标Redshift表模式。
您还可以使用wr.redshift.copy()data_conversion_params参数来指定在COPY操作期间应如何处理数据类型转换。如果您需要在将数据从Parquet文件加载到Redshift时执行默认情况下不支持的数据类型转换,则此功能非常有用。

相关问题