使用pyspark对parquet文件进行分区和重新分区

kt06eoxx  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(962)

我有一个Parquet地板分割问题,我正在努力解决。我在这个网站和网上读了很多关于分区的资料,但仍然不能解释我的问题。
步骤1:我有一个大数据集(~2tb),它 MODULE 以及 DATE 列,并按 DATE 包括 86 days . 每个 DATE 分区有 21 文件,因此共有 86 * 21 = 1806 文件夹。
步骤2:我需要根据 MODULE 列,所以我加载保存为另一个Parquet分割它 MODULE . 有 9 每个模块都有来自所有 86 因此,最终的Parquet地板 9 * 1806 = 16254 文件夹。
第三步我把每个 MODULE 通过for循环进行分区,执行聚合,并在append模式下将其另存为文件夹,以便将9个模块作为文件夹: s3://path/MODULE A/ , s3://path/MODULE B 它们不是按模块划分的,只是保存为文件夹。因为我的默认spark numpartitions是 201 ,每个模块文件夹 201 文件等共 9 * 201 = 1809 文件夹
第四步到目前为止还不错,但我需要分区回来 DATE . 所以我循环了一下 MODULE 分区并将文件保存为一个不带任何分区的Parquet文件。结果总共 2751 文件夹。我不知道这是怎么计算出来的。
步骤5然后我加载整个未分区的文件,并通过分区保存它 DATE . 这导致了大约 39k 文件,每个文件约为1.5mb。所以我有大量的小文件,它需要非常长的时间来加载Parquet地板或对它们做任何类型的操作,如 groupBy 等。
在做了更多的阅读之后,我试着使用 repartition(1).partitionBy('DATE') 在第四步中减少了文件的数量,但最后还是失败了。我知道从第四步开始我做错了什么。有没有更有效的方法来做这件事?
谢谢您

eqfvzcg8

eqfvzcg81#

找到正确的分区数是您关心的问题。
假设您有86天的数据,并且希望按日期分区保存它。然后您应该知道在一个分区下要创建多少个文件。
假设每个日期有3 gb的数据,那么您可能希望每个日期文件夹中至少有6个文件。
你可以这样做

df.repartition(6,'date').write.partitionBy('date')...

现在,如果要限制每个文件中的记录数,请使用

df.repartition(6, 'date').write.option("maxRecordsPerFile", 10000).partitionBy('date')...

相关问题