我有一个执行count(*)
、min()
、max()
和count with a filter based on column type
的SQL查询。
id val desc date
1 10 Test 2023-01-01
2 20 Test1 2023-10-01
3 30 2023-01-02
4 40 Test2 2023-01-01
对于所描述的DF,我有一个脚本,它将生成以下查询:
SELECT
COUNT(DISTINCT id) as distinct_id,
COUNT(*) as rows_id,
COUNT(DISTINCT id) as distinct_val,
COUNT(*) as rows_val,
COUNT(DISTINCT desc) as distinct_desc,
COUNT(*) as rows_desc,
(SELECT COUNT(*) FROM table WHERE TRIM(desc) = '') as missing_desc,
COUNT(DISTINCT date) as distinct_date,
COUNT(*) as rows_date,
COUNT(date) as count_date,
MIN(date) as min_date,
MAX(date) as max_date
FROM table
这个查询并不复杂,尽管在我的prod环境中,我的表大约有100万行和150列,但我在这种情况下的结果DF至少有350列。
当我使用df.explain()
来获取查询计划时,我有以下查询计划(在这里抽象):
HashAggregateKeys...
Subquery...
HashAggregate...
Exchange SinglePartition....
HashAggregate...
Project...
Filter...
InMemoryTableScan...
InMemoryRelation...
Project...
FileScan...
Subquery...
HashAggregate...
Exchange SinglePartition....
HashAggregate...
Project...
Filter...
InMemoryTableScan...
InMemoryRelation...
Project...
FileScan...
... (for each column the query plan repeats)
Exchange SinglePartition...
HashAggregate...
HashAggregate...
Exchange hashpartitioning...
HashAggregate...
Expand...
InMemoryTableScan...
InMemoryRelation...
Project...
FileScan...
我的集群有1个worker(自动扩展到20个worker-它总是从1个worker开始,并在执行过程中扩展)。尽管如此,SQL查询的性能并不好。提取指标大约需要10分钟(在我的prod DF上)。我的疑问是:
- 考虑到所呈现的查询,在不改变其结构的情况下,是否有任何方法可以通过Spark中的一些额外配置来优化其执行时间?
- 如果有必要更改查询,那么获取每列指标的最佳方法是什么?
- 注1*:* 我从来没有通过观察查询计划来优化查询,这就是我问这个问题的原因。*
- 注2*:* 我已经尝试过使用df.summary(),但是它没有为日期带来指标或为字符串带来自定义指标。这就是我构建SQL查询的原因。*
1条答案
按热度按时间bqf10yzr1#
您正在重复一些操作,您可以使用已计算的列,如下所示:
希望能帮上忙。