我正在对一个 parquet 表运行一个简单的计数查询,查询如下:
SELECT COUNT(*) FROM parquet_table WHERE
dt >= '2023-04-14T06:00:00Z' and dt < '2023-06-13T06:00:00Z' and
car.type = 'auto' and car.platform = 'IOS' and at >= 1681452000000
and at <= 1686636000000 ;
字符串
其中dt是表分区。
检查计划,我可以注意到所有过滤器都被下推到数据源:
Scan parquet spark_catalog.default.parquet_table
Output [3]: [at#3L, car#5, dt#25]
Batched: true
Location: InMemoryFileIndex [s3a://...., ... 1439 entries]
PartitionFilters: [isnotnull(dt#25), (dt#25 >= 2023-04-14T06:00:00Z), (dt#25 < 2023-06-13T06:00:00Z)]
PushedFilters: [IsNotNull(car.type), IsNotNull(car.platform), IsNotNull(at), EqualTo(car.type,auto), EqualTo(car.platform,IOS), GreaterThanOrEqual(at,1681452000000), LessThanOrEqual(at,1686636000000)]
ReadSchema: struct<at:bigint,car:struct<platform:string,type:string>>
型
然而,当检查 parquet 扫描的行数时,我发现这些行并没有真正根据推送的过滤器进行过滤,只有在Spark过滤器之后才是如此。
的数据
另外,既然所有的过滤器都被下推了,优化器难道不应该足够聪明,完全删除过滤器步骤吗?
我使用Apache Spark 3.3.2,Scala 2.12
1条答案
按热度按时间u5rb5r591#
Spark推送过滤器可能只对数据块进行部分过滤,而不是逐行应用过滤器。
例如,给定数据块,检查
IOS
是否在最小值和最大值car.platform
之间。如果是,则读取整个块(沿着所有不需要的行)
否则,跳过它。
我在Spark中找不到这方面的文档,但这是Apache Drill文档describes parquet filter pushdown。
Spark 3.2 API documentation也暗示了这个。
有3种过滤器:
在我的例子中,推送过滤器将读取的行从1.4B减少到了150M。
最终的过滤结果是22M行,因此即使推送的过滤器只过滤了部分数据,它们仍然显著降低了运行时间。