我正在尝试用pyspark覆盖s3中的一个Parquet文件。已为bucket启用版本控制。
我正在使用以下代码:
写入v1:
df_v1.repartition(1).write.parquet(path='s3a://bucket/file1.parquet')
更新v2:
df_v1 = spark.read.parquet("s3a://bucket/file1.parquet")
df_v2 = df_v1.... <- transform
df_v2.repartition(1).write.mode("overwrite").parquet('s3a://bucket/file1.parquet')
但当我读取df_v2时,它包含两次写入的数据。此外,在编写df\u v1时,我可以看到一个part-.snappy.parquet文件,在编写df\u v2后,我可以看到两个。它的行为是附加而不是覆盖。
我错过了什么?谢谢
spark=2.4.4 hadoop=2.7.3
1条答案
按热度按时间nle07wnf1#
这个问题可能是因为您使用的是s3。在s3中,文件系统是基于键/值的,这意味着没有名为
file1.parquet
,只有密钥类似的文件s3a://bucket/file1.parquet/part-XXXXX-b1e8fd43-ff42-46b4-a74c-9186713c26c6-c000.parquet
(这只是一个例子)。所以当你“覆盖”的时候,你应该覆盖文件夹,这是无法检测到的。因此,spark创建新的键:它就像一个“append”模式。
您可能需要编写自己的函数来覆盖“文件夹”--删除名称中包含文件夹的所有键。