spark没有利用Parquet地板的hdfs分区

qhhrdooz  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(388)

我正在使用以下命令将Parquet文件写入hdfs: df.write.mode(SaveMode.Append).partitionBy(id).parquet(path) 之后,我读取并过滤文件,如下所示:

val file = sqlContext.read.parquet(folder)
val data = file.map(r => Row(r.getInt(4).toString, r.getString(0), r.getInt(1),
    r.getLong(2), r.getString(3)))

val filteredData = data.filter(x => x.thingId.equals("1"))
filteredData.collect()

我预计,spark将利用文件的分区,只读取“thingid=1”的分区。事实上,spark读取文件的所有分区,而不仅仅是过滤的分区(thingid=1的分区)。如果我查看日志,我可以看到它确实读取了所有内容:
16/03/21 01:32:33信息Parquet:正在从读取Parquet文件hdfs://sandbox.hortonworks.com/path/id=1/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33信息Parquet关系:读取Parquet文件hdfs://sandbox.hortonworks.com/path/id=42/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.Parquet地板16/03/21 01:32:33信息Parquet:正在从读取Parquet文件hdfs://sandbox.hortonworks.com/path/id=17/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33信息Parquet关系:读取Parquet文件hdfs://sandbox.hortonworks.com/path/0833/id=33/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.Parquet地板16/03/21 01:32:33信息Parquet:正在从读取Parquet文件hdfs://sandbox.hortonworks.com/path/id=26/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 16/03/21 01:32:33信息Parquet关系:读取Parquet文件hdfs://sandbox.hortonworks.com/path/id=12/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.Parquet地板
我有什么遗漏吗?当我查看文档时,spark应该知道基于过滤器,它应该只读取thingid=1的分区。你们有人知道问题出在哪里吗?

rlcwz9us

rlcwz9us1#

一些问题可能会阻止spark成功地“下推” predicate (即在输入格式级别使用过滤器):
过滤器下推关闭:根据您使用的spark版本, predicate 下推选项( spark.sql.parquet.filterPushdown )可能已关闭。从spark 1.5.0开始,默认情况下它处于启用状态-因此请检查您的版本和配置
过滤器是“不透明的”:这里似乎是这样的:您正在加载parquet文件,将每一行Map到另一行(对列重新排序?),然后使用 filter 接受函数的方法。spark不能“读取”函数代码并意识到它使用了分区列上的比较-to spark,这只是一个 Row => Boolean 可以做各种检查的函数。。。
要使过滤器下推工作,您需要在将记录Map到与原始结构“分离”的内容之前使用它,并使用 filter 使用可由spark解析的筛选器的重载,例如:

// assuming the relevant column name is "id" in the parquet structure
val filtered = file.filter("id = 1") 

// or:
val filtered = file.filter(col("id") === 1) 

// and only then:
val data = filtered.map(r => Row(...))

相关问题