flink滑动窗口不能像预期的那样工作

p4tfgftt  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(656)

我有一个数据流1,2,3,4,5,6。。。。。
我应用滑动窗口如下
inputstream.keyby(“id”).countwindow(2,1);
预期产量
1,2
2,3
3,4 ..
实际产量
1
1,2
2,3
3,4
为什么在累积窗口大小之前它会先滑动

3z6pesqy

3z6pesqy1#

首先,您提供的预期输出是错误的。您指定的窗口大小为2分钟。所以输出(假设是窗口的开始和结束)应该是:

1:00:00, 1:01:00
1:01:00, 1:02:00

第一个带有时间戳的事件 1:00:00 应分配给windows (0:59:00, 1:01:00) 以及 (1:00:00, 1:02:00) . 我相信这回答了你的问题。
编辑后:
对于countwindow,应用相同的规则。第一个元素属于两个窗口。用一个简单的方法来推理是比较容易的 countWindow(4,2) . 请看一个基本示例:

val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
sEnv.setParallelism(1)

sEnv.fromCollection((1 to 10)).countWindowAll(4, 2).apply(
  (window, numbers, collector: Collector[Seq[Int]]) =>
    collector.collect(numbers.toSeq)
).print()

sEnv.execute()

输出为:

List(1, 2)
List(1, 2, 3, 4)
List(3, 4, 5, 6)
List(5, 6, 7, 8)
List(7, 8, 9, 10)

请注意,第一个元素所属的第一个窗口从过去开始。

i1icjdpr

i1icjdpr2#

多亏了dawid wysakowicz的回答我才明白。我只想加一个数字,希望能帮助理解。

实际上,在滑动窗口中,每个元素必须包含在两个窗口中。也就是说,第一个元素也必须在两个窗口中。

相关问题