在Spark中优化简单查询

ghhaqwfi  于 2023-10-23  发布在  Apache
关注(0)|答案(1)|浏览(108)

我有一个执行count(*)min()max()count with a filter based on column type的SQL查询。

id  val  desc   date
1   10   Test   2023-01-01
2   20   Test1  2023-10-01
3   30          2023-01-02
4   40   Test2  2023-01-01

对于所描述的DF,我有一个脚本,它将生成以下查询:

SELECT 
COUNT(DISTINCT id) as distinct_id,
COUNT(*) as rows_id,
COUNT(DISTINCT id) as distinct_val,
COUNT(*) as rows_val,
COUNT(DISTINCT desc) as distinct_desc,
COUNT(*) as rows_desc,
(SELECT COUNT(*) FROM table WHERE TRIM(desc) = '') as missing_desc,
COUNT(DISTINCT date) as distinct_date,
COUNT(*) as rows_date,
COUNT(date) as count_date,
MIN(date) as min_date,
MAX(date) as max_date
FROM table

这个查询并不复杂,尽管在我的prod环境中,我的表大约有100万行和150列,但我在这种情况下的结果DF至少有350列。
当我使用df.explain()来获取查询计划时,我有以下查询计划(在这里抽象):

HashAggregateKeys...
       Subquery...
          HashAggregate...
            Exchange SinglePartition....
               HashAggregate...
                  Project...
                     Filter...
                        InMemoryTableScan...
                           InMemoryRelation...
                              Project...
                                 FileScan...
        Subquery...
          HashAggregate...
            Exchange SinglePartition....
               HashAggregate...
                  Project...
                     Filter...
                        InMemoryTableScan...
                           InMemoryRelation...
                              Project...
                                 FileScan...
         ... (for each column the query plan repeats)
Exchange SinglePartition...
  HashAggregate...
    HashAggregate...
       Exchange hashpartitioning...
          HashAggregate...
             Expand...
                InMemoryTableScan...
                    InMemoryRelation...
                       Project...
                          FileScan...

我的集群有1个worker(自动扩展到20个worker-它总是从1个worker开始,并在执行过程中扩展)。尽管如此,SQL查询的性能并不好。提取指标大约需要10分钟(在我的prod DF上)。我的疑问是:

  • 考虑到所呈现的查询,在不改变其结构的情况下,是否有任何方法可以通过Spark中的一些额外配置来优化其执行时间?
  • 如果有必要更改查询,那么获取每列指标的最佳方法是什么?
  • 注1*:* 我从来没有通过观察查询计划来优化查询,这就是我问这个问题的原因。*
  • 注2*:* 我已经尝试过使用df.summary(),但是它没有为日期带来指标或为字符串带来自定义指标。这就是我构建SQL查询的原因。*
bqf10yzr

bqf10yzr1#

您正在重复一些操作,您可以使用已计算的列,如下所示:

select 
    distinct_id,
    distinct_id as distinct_val,
    rows_id,
    rows_id as  rows_val,
    rows_id as rows_desc,
    rows_id as rows_date,
    count_date,
    min_date,
    max_date,
    distinct_date,
    distinct_desc
from (
    SELECT 
            COUNT(DISTINCT id) as distinct_id,
            COUNT(*) as rows_id,
            COUNT(DISTINCT desc) as distinct_desc,
            (SELECT COUNT(*) FROM table WHERE TRIM(desc) = '') as missing_desc, --- this column can calculated separately and then join the main result
            COUNT(DISTINCT date) as distinct_date,
            COUNT(date) as count_date,
            MIN(date) as min_date,
            MAX(date) as max_date
    FROM table
)t

希望能帮上忙。

相关问题