我有下面的场景Parquet文件和阅读它使用Spark。。
脚本:
Parquet锉数:1
文件中的块数(行组):3
每个块(行组)的大小如下
blockSize: 195 MB, rowCount: 1395661, compressedSize: 36107 bytes
blockSize: 295 MB, rowCount: 1538519, compressedSize: 38819 bytes
blockSize: 13 MB, rowCount: 52945, compressedSize: 1973 bytes
当我尝试使用spark读取这个Parquet文件时,它只创建一个分区。。下面是代码
val df = sqlContext.read.parquet(path)
println(df.rdd.getNumPartitions) // result is 1
parquet.block.size=128 mb
据我所知,hadoop在读取操作期间将一个hdfs块Map到一个Parquet块大小,因此根据本例,它应该Map到三个hdfs块。。当我尝试使用spark读取这个Parquet文件时,我期望有3个分区,但结果是1个分区,我猜spark是基于Parquet文件大小(即压缩大小)而不是基于文件中的块大小创建分区数。
问题是,为什么spark不根据Parquet文件中的块数/块大小对数据进行分区,而是按Parquet文件大小(压缩大小)进行分区?
1条答案
按热度按时间llycmphe1#
需要注意的是,您的代码创建的是sparkDataframe,而不是rdd。考虑到这一点,我认为这是一个很好的答案,可以回答spark如何决定Dataframe中的分区数的问题。
Dataframe和RDD是不同的数据表示。下面的文章介绍了不同的数据表示形式以及何时使用哪种数据表示形式:
https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-dataset...
首先,由于dataframe和dataset API构建在spark sql引擎之上,因此它使用catalyst生成优化的逻辑和物理查询计划
下面的堆栈溢出文章介绍了如何在spark 1.6.0及更高版本中重新划分Dataframe:
如何定义Dataframe的分区?
也就是说,Dataframe是由catalyst优化的,我建议让优化器进行优化。
将此理解应用于您的问题:
“为什么spark不根据Parquet文件中的块数/块大小对数据进行分区?”
从Parquet文件创建Dataframe时,请求由catalyst opimizer处理。catalyst生成一个物理查询计划,包括数据分区,并针对您的集群配置进行了优化。