版本:emr-5.33.1、pyspark2.4.7
尝试迭代读取数据子集,转换它,然后将它们保存到保存桶。路径看起来像这样:{bucket_uri}/folder1/date=20220101/
当没有这样的date=20220101分区文件夹时,可以写入,但它输出
pyspark.sql.utils.AnalysisException: 'path ... already exists'
我的代码如下所示:
output_path = 'bucket_uri/folder1/date=20220101'
for i in range(0, 100, 10):
pdf = spark.read.parquet(file_list[i:i+10])
.... doing transformations....
pdf_transformed.write.parquet(output_path)
我可以通过在每次迭代中将pyspark df写入不同的文件夹bucket_uri/folder1/date=20220101/iteration{i}
来添加额外的层,但我希望将所有 parquet 文件保存在一个文件夹中。
1条答案
按热度按时间wfveoks01#
在将 Dataframe 写入S3时,您需要指定模式-追加或覆盖。追加模式将保留现有数据并将新数据添加到同一文件夹,而覆盖模式将删除现有数据并写入新数据。因此,最后,它归结为您是否希望在输出路径中保留现有数据。