Spark pushed过滤器无效

b1payxdu  于 2023-08-06  发布在  Apache
关注(0)|答案(1)|浏览(142)

我正在对一个 parquet 表运行一个简单的计数查询,查询如下:

SELECT COUNT(*) FROM parquet_table WHERE 
dt >= '2023-04-14T06:00:00Z' and dt < '2023-06-13T06:00:00Z' and 
car.type = 'auto' and car.platform = 'IOS' and at >= 1681452000000 
and at <= 1686636000000 ;

字符串
其中dt是表分区。
检查计划,我可以注意到所有过滤器都被下推到数据源:

Scan parquet spark_catalog.default.parquet_table
Output [3]: [at#3L, car#5, dt#25]
Batched: true
Location: InMemoryFileIndex [s3a://...., ... 1439 entries]
PartitionFilters: [isnotnull(dt#25), (dt#25 >= 2023-04-14T06:00:00Z), (dt#25 < 2023-06-13T06:00:00Z)]
PushedFilters: [IsNotNull(car.type), IsNotNull(car.platform), IsNotNull(at), EqualTo(car.type,auto), EqualTo(car.platform,IOS), GreaterThanOrEqual(at,1681452000000), LessThanOrEqual(at,1686636000000)]
ReadSchema: struct<at:bigint,car:struct<platform:string,type:string>>


然而,当检查 parquet 扫描的行数时,我发现这些行并没有真正根据推送的过滤器进行过滤,只有在Spark过滤器之后才是如此。


的数据
另外,既然所有的过滤器都被下推了,优化器难道不应该足够聪明,完全删除过滤器步骤吗?
我使用Apache Spark 3.3.2,Scala 2.12

u5rb5r59

u5rb5r591#

Spark推送过滤器可能只对数据块进行部分过滤,而不是逐行应用过滤器。
例如,给定数据块,检查IOS是否在最小值和最大值car.platform之间。
如果是,则读取整个块(沿着所有不需要的行)
否则,跳过它。
我在Spark中找不到这方面的文档,但这是Apache Drill文档describes parquet filter pushdown
Spark 3.2 API documentation也暗示了这个。
有3种过滤器:

  • 可推送的过滤器,在扫描后不需要再次评估。
  • 在扫描之后仍然需要评估的可推滤波器,例如 parquet 行组筛选器。
  • 非可推动过滤器。

在我的例子中,推送过滤器将读取的行从1.4B减少到了150M。
最终的过滤结果是22M行,因此即使推送的过滤器只过滤了部分数据,它们仍然显著降低了运行时间。

相关问题