flink sql中跳跃窗口上的指数衰减移动平均:投射时间

dfty9e19  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(557)

现在,我们在flink中使用了具有奇特窗口的sql,我正在尝试使用“在未来flink发布的表api和sql中可能出现的情况”来引用衰减的移动平均值。来自他们的sql路线图/预览2017-03帖子:

table
  .window(Slide over 1.hour every 1.second as 'w)
  .groupBy('productId, 'w)
  .select(
    'w.end,
    'productId,
    ('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)

以下是我的尝试(也受到方解石衰变例子的启发):

SELECT                                                                              
  lb_index one_key,                                                           
  HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,  
  SUM(Y * 
      EXP(
        proctime - 
        HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
      ))                                                             
FROM write_position                                                                
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)

time是处理时间,我们从appendstream表中创建write\u位置得到proctime,如下所示:

tEnv.registerTable(
    "write_position", 
    tEnv.fromDataStream(appendStream, "lb_index, Y, proctime.proctime"))

我得到这个错误:

Cannot apply '-' to arguments of type '<TIME ATTRIBUTE(PROCTIME)> - <TIME ATTRIBUTE(PROCTIME)>'. 
Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>'

我试过把时间投射到我所知道的每一种类型上(试图到达数字的应许之地),但我就是找不到如何让它发挥作用。
我错过什么了吗?proctime是一种非常特殊的“系统变更号”时间,你不能转换吗?如果是这样的话,还必须有某种方法将其与hop\u start(proctime,…)值进行比较。

knpiaxh1

knpiaxh11#

您可以使用timestampdiff减去两个时间点(参见文档)。你是这样用的

TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)

其中时间点单位可以是秒、分钟、小时、天、月或年。
我还没有尝试过处理时间,但它确实适用于事件时间字段,所以希望它能。

相关问题