Spark SQL相关子查询group by

kkbh8khc  于 2023-11-21  发布在  Apache
关注(0)|答案(1)|浏览(155)

Spark SQL版本3.3。
以下查询:

select d.from_id,
       d.to_id,
       d.hts_code,
       min(d.transaction_date)                                            as earliest_transaction_date,
       max(d.transaction_date)                                            as latest_transaction_date,
       cast(months_between( current_date, max(d.transaction_date)) AS INT) AS months_since_last_transaction,
       (select count(*)
            from quarters q
            WHERE q.from_id = d.from_id
            AND q.to_id = d.to_id
            AND q.hts_code = d.hts_code
            group by q.from_id, q.to_id, q.hts_code
        ) as quarters
from data d
group by d.from_id,
         d.to_id,
         d.hts_code;

字符串
失败,并出现以下错误:

AnalysisException: Correlated scalar subquery 'scalarsubquery(d.from_id, d.to_id, d.hts_code)' is neither present in the group by, nor in an aggregate function. Add it to group by using ordinal position or wrap it in first() (or first_value) if you don't care which value you get.


我不能在子查询的WHERE子句以外的任何地方添加外部引用。我真的不明白分析器在这里期望什么。

tv6aics1

tv6aics11#

原谅scala:

import sparkSession.implicits._
val data = Seq(("a", 1),("b",2),("a", 2)).toDF("letter","number")
data.createOrReplaceTempView("data")
sparkSession.sql(s"""
select letter, number, (
  select count(*)
  from data a
  where o.letter = a.letter
)
from data o
""").show
}

字符串
收益率,正如你所料:

+------+------+----------------------+
|letter|number|scalarsubquery(letter)|
+------+------+----------------------+
|     a|     1|                     2|
|     b|     2|                     1|
|     a|     2|                     2|
+------+------+----------------------+


所以这是一个简单的相关值,它只能有一个结果。
在上面的代码之后使用此查询来模拟一个组:

sparkSession.sql(s"""
select letter, number, (
  select count(*)
  from data a
  where o.letter = a.letter
  group by a.letter
)
from data o
""").show


将导致:

A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns: letter#7


这类似于你的SQL:

select count(*)
...
group by q.from_id, q.to_id, q.hts_code


这不是有效的sql。你需要在select中有这些值(问题的第一部分),正如你所注意到的,在任何地方使用外部引用,而不是在/where中得到错误:

Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses


所以接下来你试着:

sparkSession.sql(s"""
select letter, number, (
  select count(*) c, a.letter
  from data a
  where o.letter = a.letter
  group by a.letter
) thecount
from data o
""").show


这就留下了错误:

Scalar subquery must return only one column, but got 2


你可以把它重写为一个join:

sparkSession.sql(s"""
select o.letter, o.number, a.c
from data o join (
  select count(*) c, a.letter
  from data a
  group by a.letter
) a on a.letter = o.letter
""").show


产生:

+------+------+---+
|letter|number|  c|
+------+------+---+
|     a|     2|  2|
|     a|     1|  2|
|     b|     2|  1|
+------+------+---+


如果你必须有类似的查询作为子查询(例如,你的字段是在其他地方定义的-Quality有这种方法),你可以尝试重写如下:

sparkSession.sql(s"""
select letter, number,
  (
    select max(c)
    from (
      select count(*) c, letter
      from data
      group by letter
    ) a where o.letter = a.letter
  ) thecount
from data o
""").show


其还产生:

+------+------+--------+
|letter|number|thecount|
+------+------+--------+
|     a|     1|       2|
|     b|     2|       1|
|     a|     2|       2|
+------+------+--------+


在这种情况下,你仍然必须有一个总回报(最大,第一等)。
注意:从3.4/3.5开始,相关子查询可以更强大--你可以自由地使用exists和in,以及SPARK-36114

相关问题