在编写Parquet文件时可以重叠分区吗

nxagd54h  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(378)

我有一个非常大的Dataframe大约2tb的大小。我可以用两列对它们进行分区: MODULE 以及 DATE 如果我用 MODULE 例如,每个模块可以有相同的日期 MODULE A 可能有约会 2020-07-01 , 2020-07-02 以及 MODULE B 可能有 2020-07-01 , 2020-07-05 我需要先用 MODULE 在我最终划分和存储它们之前做一些聚合和连接 DATE . 我用pyspark来编码。
在完成聚合和按模块连接之后,我将它附加到一个Parquet文件,并将整个Parquet文件加载到一个Dataframe中,然后按日期对其进行分区。问题是spark作业由于内存问题而终止。我可以直接按日期分区吗 MODULE 分区?所以分区看起来像这样:输入格式: s3://path/MODULE=A --> s3://path/DATE=2020-07-01 两个模块 A & B 存在于分区中 DATE=2020-07-01 ?
这是我的原始代码,由于在群集中的时间过长和内存不足而失败:

inpath="s3://path/file/"
outpath="s3://path/file_tmp.parquet"
fs = s3fs.S3FileSystem(anon=False)
uvaDirs = fs.ls(inpath)

# Load Data by Module

for uvapath in uvaDirs:
    customPath='s3://' + uvapath + '/'
    df1=spark.read.parquet(customPath)
    #Perform aggregations and joins
    df1.write.mode('append').parquet(outpath)

# Load - partition by date

df2=spark.read.parquet("s3://path/file_tmp.parquet")
df2.write.mode('overwrite').partitionBy("DATE").parquet("s3://path/final.parquet")

它成功地创建了 file_tmp.parquet 但在按日期加载和分区时失败。任何帮助都将不胜感激!谢谢您

zrfyljdw

zrfyljdw1#

像delta datasource一样,delta存储为parquet

(spark.read
 .format("delta")
 .load(path)
 .where(partition)
 .repartition(numFilesPerPartition)
 .write
 .option("dataChange", "false")
 .format("delta")
 .mode("overwrite")
 .option("replaceWhere", partition)
 .save(path))

// clean old file
val deltaTable = DeltaTable.forPath(spark, tablePath)
deltaTable.vacuum(0)

参考:https://docs.delta.io/latest/best-practices.html#-delta压缩文件和python语言

相关问题