为高并发和大输出文件按日期重新分区

kzmpq1sx  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(241)

我在aws胶水上运行spark作业。该作业转换数据并将输出保存到按日期(年、月、日目录)分区的Parquet文件中。job必须能够处理数TB的输入数据,并使用数百个执行器,每个执行器的内存限制为5.5 gb。
输入包括超过2年的数据。每个日期的输出Parquet文件应该尽可能大,可以选择分割成500MB的块。不需要每天创建多个小文件。
很少有经过测试的方法:
按与写入中相同的列重新分区会导致执行器出现内存不足错误:

df = df.repartition(*output_partitions)

(df
 .write
 .partitionBy(output_partitions)
 .parquet(output_path))

使用随机值的附加列重新分区会导致写入多个小输出文件(对应于 spark.sql.shuffle.partitions 值):

df = df.repartition(*output_partitions, "random")

(df
 .write
 .partitionBy(output_partitions)
 .parquet(output_path))

设置中的分区数 repartition 函数,例如to 10,给出10个相当大的输出文件,但我担心在加载实际数据(大小为tbs)时会导致内存不足错误:

df = df.repartition(10, *output_partitions, "random")

(df
 .write
 .partitionBy(output_partitions)
 .parquet(output_path))

( df 在代码段中是一个常规的spark(Dataframe)
我知道我可以用 maxRecordsPerFile 写入选项。但这限制了从单个内存分区创建的输出,因此首先,我需要按日期创建分区。
所以问题是如何将内存中的数据重新分区为:
将其拆分为多个执行器以防止内存不足错误,
将每天的输出保存到有限数量的大Parquet文件中,
并行写入输出文件(使用尽可能多的执行器)?
我读过这些资料,但没有找到解决办法:
https://mungingdata.com/apache-spark/partitionby/
https://stackoverflow.com/a/42780452
https://stackoverflow.com/a/50812609

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题