pyspark中的数据过滤器是什么?

czq61nw1  于 2023-02-05  发布在  Apache
关注(0)|答案(1)|浏览(161)

我在查询执行计划中看到名为DataFilter的内容:

FileScan parquet [product_id#12,price#14] Batched: true, DataFilters: [isnotnull(product_id#12)], Format: Parquet, Location: InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/products_parquet_dtc], PartitionFilters: [], PushedFilters: [IsNotNull(product_id)], ReadSchema: struct<product_id:int,price:int>

有一个

  • 分区筛选器:[]
  • 推送过滤器:[不为空(产品ID)]
  • 数据筛选器:[不为空(产品ID #12)]

我理解PartitionFilterPushedFilter。但是,此处显示的DataFilter是什么?类似问题here有一个答案。但是,此处给出的DataFilter的定义正是我所认为的PushedFilter(还有,那个答案有1张反对票)那么,我对PushedFilter的理解错了吗?如果没有,那么,DataFilter是什么?

mwkjh3gx

mwkjh3gx1#

这一解释是针对Spark的最新版本(3.3.1)。
PushedFiltersDataFilters的一个子集,您可以在DataSourceScanExec.scala中看到这一点。它们是DataFilters,我们可以将其 predicate 向下推,以过滤您试图读取的文件的元数据,而不是数据本身。当然,过滤元数据比过滤数据本身要快得多,因为在这样做的时候,您可能可以跳过读取大块数据。
为了构建一切,我们需要:

      • 分区筛选器**:在分区列上过滤。允许忽略parquet文件中的目录。
      • 数据过滤器**:非分区列上的筛选器。
      • 推送过滤器**:那些我们可以下推其 predicate 的DataFilters
  • 因此,当过滤器是DataFilter而不是PushedFilter时,这意味着我们不能下推 predicate 来过滤底层文件的元数据。
示例

让我们以 parquet 文件为例(并非所有文件格式都支持 predicate 下推,但 parquet 文件支持):

import org.apache.spark.sql.functions.col

val df = Seq(
  (1,2,3),
  (2,2,3),
  (3,20,300),
  (1,24,299),
).toDF("colA", "colB", "colC")

df.write.partitionBy("colA").mode("overwrite").parquet("datafilter.parquet")

所以我们只需要编写一个由colA列分区的parquet文件,文件结构如下所示:

datafilter.parquet/
├── colA=1
│   ├── part-00000-55cb3320-f145-4d64-8cba-55a72111c0c8.c000.snappy.parquet
│   └── part-00003-55cb3320-f145-4d64-8cba-55a72111c0c8.c000.snappy.parquet
├── colA=2
│   └── part-00001-55cb3320-f145-4d64-8cba-55a72111c0c8.c000.snappy.parquet
├── colA=3
│   └── part-00002-55cb3320-f145-4d64-8cba-55a72111c0c8.c000.snappy.parquet
└── _SUCCESS

让我们看看3种过滤器类型:

分区筛选器
spark.read.parquet("./datafilter.parquet").filter(col("colA") < 10).explain

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [colB#165,colC#166,colA#167] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [isnotnull(colA#167), (colA#167 < 10)], PushedFilters: [], ReadSchema: struct<colB:int,colC:int>

在这里,您可以看到我们的过滤器是PartitionFilter,因为我们的数据已经被colA分区,所以我们可以轻松地对目录进行过滤。

推送过滤器
spark.read.parquet("./datafilter.parquet").filter(col("colB") < 10).explain

== Physical Plan ==
*(1) Filter (isnotnull(colB#172) AND (colB#172 < 10))
+- *(1) ColumnarToRow
   +- FileScan parquet [colB#172,colC#173,colA#174] Batched: true, DataFilters: [isnotnull(colB#172), (colB#172 < 10)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(colB), LessThan(colB,10)], ReadSchema: struct<colB:int,colC:int>

在这里,您可以看到我们的筛选器(colB < 10)是DataFilters的一部分,这是因为colB不是分区列。
它也是PushedFilters的一部分,因为这是一个我们可以下推的 predicate 。Parquet文件将块的最小值和最大值存储为元数据。因此,如果块的最小值大于10,我们知道可以跳过读取该块。

非推送过滤器
spark.read.parquet("./datafilter.parquet").filter(col("colB") < col("colC")).explain

== Physical Plan ==
*(1) Filter ((isnotnull(colB#179) AND isnotnull(colC#180)) AND (colB#179 < colC#180))
+- *(1) ColumnarToRow
   +- FileScan parquet [colB#179,colC#180,colA#181] Batched: true, DataFilters: [isnotnull(colB#179), isnotnull(colC#180), (colB#179 < colC#180)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(colB), IsNotNull(colC)], ReadSchema: struct<colB:int,colC:int>

这个过滤器比较复杂,colB < colC不是一个我们可以下推来过滤parquet文件的元数据的过滤器,这意味着我们需要读取完整的数据,然后在内存中过滤。

相关问题