在flink的聚合原语中有一个等价于hop\u start的

hfyxw5xn  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(443)

我试图在flinksql中做一个跳跃窗口上的指数衰减移动平均。我需要访问窗口的一个边框,跳转从以下位置开始:

SELECT                                                                              
      lb_index one_key,
    -- I have access to this one:
      HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,
    -- Aggregation primitive:
      SUM(
        Y * EXP(TIMESTAMPDIFF(
          SECOND, 
          proctime, 
    -- This one throws:
          HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
      )))
    FROM write_position                                                                
    GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)

我得到以下堆栈跟踪:

11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using Aggregate(groupBy: (lb_index), window: (SlidingGroupWindow('w$, 'proctime, 5000.millis, 50.millis)), select: (lb_index, SUM($f2) AS Y, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS w$proctime))
11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using Calc(select: (lb_index, proctime, *(payload.Y, EXP(/(CAST(/INT(Reinterpret(-(HOP_START(PROCTIME(proctime), 50, 5000), PROCTIME(proctime))), 1000)), 1000))) AS $f2))
11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using rel#459:DataStreamScan.DATASTREAM.true.Acc(table=[_DataStreamTable_0])
Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Unsupported call: HOP_START 
If you think this function should be supported, you can create an issue and start a discussion for it.
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)

它确实说,当它在聚合和之外工作时,它没有实现。所以我觉得这是个范围问题。
现在,问题是:我可以转换这个表达式,在聚合之外做最后的处理,比如exp(x+y)=exp(x)*exp(y);但我一直坚持使用timestampdiff(这在我上一期中创造了奇迹)。我还没有找到将时间属性转换为数字类型的方法;而且,即使我缩小unix时间戳的比例,我也不喜欢将其指数化。
不管怎样,这项工作会有点笨重,可能会有另一种方法。我不知道如何在这个sql片段中按摩作用域,使其仍然“在”窗口作用域中,并且在没有抛出的情况下拥有开始时间。

lh80um4z

lh80um4z1#

我建议您尝试使用hop\u proctime()而不是hop\u start()。这里解释了这些区别,但是效果是您将拥有proctime属性而不是timestamp,我希望这会使timestampdiff感到高兴。

相关问题