flink stream sql排序依据

qcbq4gxm  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(928)

我有一个流式输入,比如说股票价格数据(包括多只股票),我想每1分钟按它们的价格做一次排名。排名基于所有股票的最新价格,需要对所有股票进行排序,无论是否在前1分钟更新。我尝试在flinkstreamsql中使用order by。
我没能实现我的逻辑,我对两个部分感到困惑:
为什么可以 ORDER BY 仅将时间属性用作主要且仅支持 ASC ? 我如何通过价格等其他类型实现订单?
下面的sql(来自flink文档)是什么意思?没有窗口,也没有窗口,所以我假设sql将立即为每个订单执行,在这种情况下,对一个元素进行排序看起来毫无意义。
[更新]:当我阅读procimesortprocessfunction.scala的代码时,flink似乎对下一毫秒内接收到的元素进行了排序。

SELECT *
FROM Orders
ORDER BY orderTime

最后,有没有办法用sql实现我的逻辑?

5f0d552i

5f0d552i1#

ORDER BY 流式处理中的查询很难计算,因为当我们必须发出一个需要转到结果表开头的结果时,我们不想更新整个结果。因此,我们只支持 ORDER BY time-attribute 如果我们能保证结果有(大致)增加的时间戳。
在将来(flink1.6或更高版本),我们还将支持一些查询,如 ORDER BY x ASC LIMIT 10 ,这将生成一个更新表,其中包含最小值为10的记录 x 价值观。
无论如何,你不能(轻松地)用一个 GROUP BY 翻窗。 GROUP BY 查询聚合组的记录(如果是 GROUP BY TUMBLE(rtime, INTERVAL '1' MINUTE) )变成一张唱片。所以每分钟不会有多个记录,只有一个。
如果你想要一个查询来计算前10名 a 每分钟您将需要一个类似以下查询:

SELECT a, b, c 
FROM (
  SELECT 
    a, b, c, 
    RANK() OVER (ORDER BY a PARTITION BY CEIL(t TO MINUTE) BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as rank 
  FROM yourTable)
WHERE rank <= 10

但是,flink(1.4版)还不支持这样的查询,因为时间属性用于 PARTITION BY 子句而不是 ORDER BY 合同条款 OVER Windows。

相关问题