我有一个数据流1,2,3,4,5,6。。。。。我应用滑动窗口如下inputstream.keyby(“id”).countwindow(2,1);预期产量1,22,33,4 ..实际产量11,22,33,4为什么在累积窗口大小之前它会先滑动
3z6pesqy1#
首先,您提供的预期输出是错误的。您指定的窗口大小为2分钟。所以输出(假设是窗口的开始和结束)应该是:
1:00:00, 1:01:00 1:01:00, 1:02:00
第一个带有时间戳的事件 1:00:00 应分配给windows (0:59:00, 1:01:00) 以及 (1:00:00, 1:02:00) . 我相信这回答了你的问题。编辑后:对于countwindow,应用相同的规则。第一个元素属于两个窗口。用一个简单的方法来推理是比较容易的 countWindow(4,2) . 请看一个基本示例:
1:00:00
(0:59:00, 1:01:00)
(1:00:00, 1:02:00)
countWindow(4,2)
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment sEnv.setParallelism(1) sEnv.fromCollection((1 to 10)).countWindowAll(4, 2).apply( (window, numbers, collector: Collector[Seq[Int]]) => collector.collect(numbers.toSeq) ).print() sEnv.execute()
输出为:
List(1, 2) List(1, 2, 3, 4) List(3, 4, 5, 6) List(5, 6, 7, 8) List(7, 8, 9, 10)
请注意,第一个元素所属的第一个窗口从过去开始。
i1icjdpr2#
多亏了dawid wysakowicz的回答我才明白。我只想加一个数字,希望能帮助理解。实际上,在滑动窗口中,每个元素必须包含在两个窗口中。也就是说,第一个元素也必须在两个窗口中。
2条答案
按热度按时间3z6pesqy1#
首先,您提供的预期输出是错误的。您指定的窗口大小为2分钟。所以输出(假设是窗口的开始和结束)应该是:
第一个带有时间戳的事件
1:00:00
应分配给windows(0:59:00, 1:01:00)
以及(1:00:00, 1:02:00)
. 我相信这回答了你的问题。编辑后:
对于countwindow,应用相同的规则。第一个元素属于两个窗口。用一个简单的方法来推理是比较容易的
countWindow(4,2)
. 请看一个基本示例:输出为:
请注意,第一个元素所属的第一个窗口从过去开始。
i1icjdpr2#
多亏了dawid wysakowicz的回答我才明白。我只想加一个数字,希望能帮助理解。
实际上,在滑动窗口中,每个元素必须包含在两个窗口中。也就是说,第一个元素也必须在两个窗口中。