我试图在flink中使用windows函数,但是我不能像我想的那样控制时间窗口。
我需要使用如下聚合函数。
SELECT
*,
count(id) OVER(PARTITION BY ciudad) AS c_profit,
count(id) OVER(PARTITION BY distrito) AS c_profit,
count(id) OVER(PARTITION BY c_poblado) AS c_profit
FROM fm
ORDER BY id, ciudad;
累计聚合应每5秒一次。在文档中,我只能找到两种选择:
窗口上的行:元素的每一行都被视为一个新的计算行。也就是说,每行对应一个新窗口。
窗口范围:具有相同时间戳值的所有元素行被视为同一计算行,并且属于同一窗口。
但他们没有一个按照我的需要行事。行按行数计算,并创建重叠的时间窗口(如滑动窗口),即每个新事件都将根据前25秒的事件进行评估。
我用下面的例子进行了测试。
val result4 = tEnv.sqlQuery(
"""
|SELECT
| user, product, amount,
| COUNT(user) OVER (
| PARTITION BY product
| ORDER BY proctime
| RANGE
| BETWEEN INTERVAL '25' SECOND preceding AND CURRENT ROW) AS conteo
| FROM OrdersB
|""".stripMargin)
我正在为timeinterval参数寻找其他选项
SELECT
agg1(col1) OVER(
[PARTITION BY (value_expression1,..., value_expressionN)]
ORDER BY timeCol
RANGE
BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName,
...
FROM Tab1
类似于在分组方式中滚动窗口的行为,分组方式是按单独的时间块
也许可以通过数据流或sql以其他方式完成。我的数据源是kafka类型的主题,测试输入数据是:
10,"CUZ",3
20,"CUZ",6
30,"CUZ",5
40,"LIM",3
50,"CUZ",4
60,"LIM",3
70,"AQP",1
80,"AQP",1
90,"AQP",1
输出应该是这样的
10,"CUZ",3
20,"CUZ",6
30,"CUZ",5
40,"LIM",3
50,"CUZ",4
60,"LIM",3
70,"AQP",1
80,"AQP",1
90,"AQP",1
另一个问题是聚合不是在单个块中完成的,而是逐个事件完成的,最后一个事件是具有总聚合的事件
val result4 = tEnv.sqlQuery(
"""
|SELECT
| user, product, amount,proctime,
| COUNT(user) OVER (
| PARTITION BY product
| ORDER BY proctime
| RANGE BETWEEN INTERVAL '25' SECOND preceding AND CURRENT ROW) AS conteo
| FROM OrdersB
|""".stripMargin)
1> (true,(10,"CUZ",3,2020-05-18 02:06:10.518,1))
1> (true,(20,"CUZ",6,2020-05-18 02:06:21.096,2))
1> (true,(30,"CUZ",5,2020-05-18 02:06:25.538,3))
1> (true,(40,"LIM",3,2020-05-18 02:06:29.063,1))
1> (true,(50,"CUZ",4,2020-05-18 02:06:32.805,4))
1> (true,(60,"LIM",3,2020-05-18 02:06:41.459,2))
1> (true,(70,"AQP",1,2020-05-18 02:06:44.698,1))
1> (true,(80,"AQP",1,2020-05-18 02:06:48.331,2))
1> (true,(90,"AQP",1,2020-05-18 02:06:52.275,3))
暂无答案!
目前还没有任何答案,快来回答吧!