我在aws胶水上运行spark作业。该作业转换数据并将输出保存到按日期(年、月、日目录)分区的Parquet文件中。job必须能够处理数TB的输入数据,并使用数百个执行器,每个执行器的内存限制为5.5 gb。
输入包括超过2年的数据。每个日期的输出Parquet文件应该尽可能大,可以选择分割成500MB的块。不需要每天创建多个小文件。
很少有经过测试的方法:
按与写入中相同的列重新分区会导致执行器出现内存不足错误:
df = df.repartition(*output_partitions)
(df
.write
.partitionBy(output_partitions)
.parquet(output_path))
使用随机值的附加列重新分区会导致写入多个小输出文件(对应于 spark.sql.shuffle.partitions
值):
df = df.repartition(*output_partitions, "random")
(df
.write
.partitionBy(output_partitions)
.parquet(output_path))
设置中的分区数 repartition
函数,例如to 10,给出10个相当大的输出文件,但我担心在加载实际数据(大小为tbs)时会导致内存不足错误:
df = df.repartition(10, *output_partitions, "random")
(df
.write
.partitionBy(output_partitions)
.parquet(output_path))
( df
在代码段中是一个常规的spark(Dataframe)
我知道我可以用 maxRecordsPerFile
写入选项。但这限制了从单个内存分区创建的输出,因此首先,我需要按日期创建分区。
所以问题是如何将内存中的数据重新分区为:
将其拆分为多个执行器以防止内存不足错误,
将每天的输出保存到有限数量的大Parquet文件中,
并行写入输出文件(使用尽可能多的执行器)?
我读过这些资料,但没有找到解决办法:
https://mungingdata.com/apache-spark/partitionby/
https://stackoverflow.com/a/42780452
https://stackoverflow.com/a/50812609
暂无答案!
目前还没有任何答案,快来回答吧!