我检查了spark作业的输出Parquet文件,因为 Out of Memory Errors
. 我用 Spark 1.6.0
在 Cloudera 5.13.1
我注意到Parquet地板行组大小不均匀。第一排和最后一排的队伍都很庞大。剩下的真的很小。。。
Parquet工具产量减少 RC = row count
, TS = total size
:
row group 1: RC:5740100 TS:566954562 OFFSET:4
row group 2: RC:33769 TS:2904145 OFFSET:117971092
row group 3: RC:31822 TS:2772650 OFFSET:118905225
row group 4: RC:29854 TS:2704127 OFFSET:119793188
row group 5: RC:28050 TS:2356729 OFFSET:120660675
row group 6: RC:26507 TS:2111983 OFFSET:121406541
row group 7: RC:25143 TS:1967731 OFFSET:122069351
row group 8: RC:23876 TS:1991238 OFFSET:122682160
row group 9: RC:22584 TS:2069463 OFFSET:123303246
row group 10: RC:21225 TS:1955748 OFFSET:123960700
row group 11: RC:19960 TS:1931889 OFFSET:124575333
row group 12: RC:18806 TS:1725871 OFFSET:125132862
row group 13: RC:17719 TS:1653309 OFFSET:125668057
row group 14: RC:1617743 TS:157973949 OFFSET:134217728
这是已知的错误吗?如何在spark中设置 parquet 块大小(行组大小)?
编辑:
spark应用程序所做的是:它读取一个大的avro文件,然后通过两个分区键(使用 distribute by <part_keys>
然后使用以下命令为每个分区写入Parquet文件: DF.write.partitionBy(<part_keys>).parquet(<path>)
2条答案
按热度按时间9rygscc11#
有一个已知的错误:Parquet-1337
atmip9wb2#
您的rdd可能分区不均。每个块中的行数与rdd的不同分区的大小有关。
创建rdd时,每个分区包含的数据量大致相同(由于hashpartitioner)。在处理spark作业之后,一个分区可能比另一个分区包含更多的数据,可能过滤器转换从一个分区中删除的行比从另一个分区中删除的行多。在写入parquet文件之前,可以调用repartition重新平衡分区。
编辑:如果问题与分区无关,那么减小行组的大小可能会有所帮助: