window还有哪些其他参数?

qxsslcnc  于 2021-06-26  发布在  Flink
关注(0)|答案(0)|浏览(218)

我试图在flink中使用windows函数,但是我不能像我想的那样控制时间窗口。
我需要使用如下聚合函数。

SELECT
         *,
         count(id) OVER(PARTITION BY ciudad) AS c_profit,
         count(id) OVER(PARTITION BY distrito) AS c_profit,
         count(id) OVER(PARTITION BY c_poblado) AS c_profit
       FROM fm
       ORDER BY id, ciudad;

累计聚合应每5秒一次。在文档中,我只能找到两种选择:
窗口上的行:元素的每一行都被视为一个新的计算行。也就是说,每行对应一个新窗口。
窗口范围:具有相同时间戳值的所有元素行被视为同一计算行,并且属于同一窗口。
但他们没有一个按照我的需要行事。行按行数计算,并创建重叠的时间窗口(如滑动窗口),即每个新事件都将根据前25秒的事件进行评估。
我用下面的例子进行了测试。

val result4 = tEnv.sqlQuery(
      """
        |SELECT
        |    user, product, amount,
        |    COUNT(user) OVER (
        |        PARTITION BY product
        |        ORDER BY proctime
        |        RANGE
        |        BETWEEN INTERVAL '25' SECOND preceding AND CURRENT ROW) AS conteo
        |  FROM OrdersB
        |""".stripMargin)

我正在为timeinterval参数寻找其他选项

SELECT 
    agg1(col1) OVER(
     [PARTITION BY (value_expression1,..., value_expressionN)] 
     ORDER BY timeCol
     RANGE 
     BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName, 
... 
FROM Tab1

类似于在分组方式中滚动窗口的行为,分组方式是按单独的时间块

也许可以通过数据流或sql以其他方式完成。我的数据源是kafka类型的主题,测试输入数据是:

10,"CUZ",3
20,"CUZ",6
30,"CUZ",5
40,"LIM",3
50,"CUZ",4
60,"LIM",3
70,"AQP",1
80,"AQP",1
90,"AQP",1

输出应该是这样的

10,"CUZ",3
20,"CUZ",6
30,"CUZ",5
40,"LIM",3
50,"CUZ",4
60,"LIM",3
70,"AQP",1
80,"AQP",1
90,"AQP",1

另一个问题是聚合不是在单个块中完成的,而是逐个事件完成的,最后一个事件是具有总聚合的事件

val result4 = tEnv.sqlQuery(
      """
        |SELECT
        |    user, product, amount,proctime,
        |    COUNT(user) OVER (
        |        PARTITION BY product
        |        ORDER BY proctime
        |        RANGE BETWEEN INTERVAL '25' SECOND preceding AND CURRENT ROW) AS conteo
        |  FROM OrdersB
        |""".stripMargin)
1> (true,(10,"CUZ",3,2020-05-18 02:06:10.518,1))
1> (true,(20,"CUZ",6,2020-05-18 02:06:21.096,2))
1> (true,(30,"CUZ",5,2020-05-18 02:06:25.538,3))
1> (true,(40,"LIM",3,2020-05-18 02:06:29.063,1))
1> (true,(50,"CUZ",4,2020-05-18 02:06:32.805,4))
1> (true,(60,"LIM",3,2020-05-18 02:06:41.459,2))
1> (true,(70,"AQP",1,2020-05-18 02:06:44.698,1))
1> (true,(80,"AQP",1,2020-05-18 02:06:48.331,2))
1> (true,(90,"AQP",1,2020-05-18 02:06:52.275,3))

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题