我在查询执行计划中看到名为DataFilter
的内容:
FileScan parquet [product_id#12,price#14] Batched: true, DataFilters: [isnotnull(product_id#12)], Format: Parquet, Location: InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/products_parquet_dtc], PartitionFilters: [], PushedFilters: [IsNotNull(product_id)], ReadSchema: struct<product_id:int,price:int>
有一个
- 分区筛选器:[]
- 推送过滤器:[不为空(产品ID)]
- 数据筛选器:[不为空(产品ID #12)]
我理解PartitionFilter
和PushedFilter
。但是,此处显示的DataFilter
是什么?类似问题here有一个答案。但是,此处给出的DataFilter
的定义正是我所认为的PushedFilter
(还有,那个答案有1张反对票)那么,我对PushedFilter
的理解错了吗?如果没有,那么,DataFilter
是什么?
1条答案
按热度按时间mwkjh3gx1#
这一解释是针对Spark的最新版本(3.3.1)。
PushedFilters
是DataFilters
的一个子集,您可以在DataSourceScanExec.scala
中看到这一点。它们是DataFilters
,我们可以将其 predicate 向下推,以过滤您试图读取的文件的元数据,而不是数据本身。当然,过滤元数据比过滤数据本身要快得多,因为在这样做的时候,您可能可以跳过读取大块数据。为了构建一切,我们需要:
DataFilters
DataFilter
而不是PushedFilter
时,这意味着我们不能下推 predicate 来过滤底层文件的元数据。示例
让我们以 parquet 文件为例(并非所有文件格式都支持 predicate 下推,但 parquet 文件支持):
所以我们只需要编写一个由
colA
列分区的parquet文件,文件结构如下所示:让我们看看3种过滤器类型:
分区筛选器
在这里,您可以看到我们的过滤器是
PartitionFilter
,因为我们的数据已经被colA
分区,所以我们可以轻松地对目录进行过滤。推送过滤器
在这里,您可以看到我们的筛选器(
colB < 10
)是DataFilters
的一部分,这是因为colB
不是分区列。它也是
PushedFilters
的一部分,因为这是一个我们可以下推的 predicate 。Parquet文件将块的最小值和最大值存储为元数据。因此,如果块的最小值大于10,我们知道可以跳过读取该块。非推送过滤器
这个过滤器比较复杂,
colB < colC
不是一个我们可以下推来过滤parquet文件的元数据的过滤器,这意味着我们需要读取完整的数据,然后在内存中过滤。