因此,我们每小时都有很多lambda在运行,它们从API查询数据,将json响应转换为 Dataframe ,并使用method='multi'
插入 Dataframe 以进行批量插入。
但是所有这些lambda都给我们的单节点红移集群带来了很大的压力(是的,我们很小,现在还不能增加节点)。每当lambda将数据插入我们的集群时,CPU使用率就会达到100%,这会导致其他作业超时。
为了加快插入速度,我正在测试awswrangler的红移副本。
1.将json响应转换为 Dataframe
1.使用www.example.com _parquet()将 Dataframe 导出到parquetwr.s3.to
1.使用wr.redshift.copy()将parquet附加到红移表
使用awswrangler测试卸载和复制
1.使用wr.redshift.unload()
从示例表中捕获了几行的 Dataframe
1.使用wr.redshift.copy()
将 Dataframe 加载到类似的模式测试表中
复制失败,抛出Spectrum Scan Error: <s3path> has an incompatible Parquet schema for column
:
- TIMESTAMP列转换为CHAR类型
- INT2列转换为INT64
我希望导出的 Dataframe 具有与表或副本相似的架构,以解决数据类型不匹配的问题。
卸载调用:
test_df = wr.redshift.unload(
sql=f"SELECT * FROM {schema}.{table_name};",
path="s3://bucket-name/copy_test/",
keep_files=False,
con=rs_con
)
复制呼叫
wr.redshift.copy(
df=test_df,
table=table_name,
schema=schema,
keep_files=False,
path='s3://bucket-name/copy_test/',
use_column_names=True,
index=False,
con=rs_con
)
1条答案
按热度按时间0mkxixxg1#
您可以确保Parquet文件的模式与目标Redshift表的模式匹配,方法是在使用
wr.s3.to_parquet()
写入Parquet文件时为列指定正确的数据类型,或者在调用wr.redshift.copy()
时使用schema参数指定目标Redshift表模式。您还可以使用
wr.redshift.copy()
的data_conversion_params
参数来指定在COPY操作期间应如何处理数据类型转换。如果您需要在将数据从Parquet文件加载到Redshift时执行默认情况下不支持的数据类型转换,则此功能非常有用。