代码如下。根据我的理解,当水印大于窗口结束时间时,窗口将关闭。但我的测试结果非常混乱。
public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> ds = env.socketTextStream("192.168.10.100", 9999);
SingleOutputStreamOperator<T> map = ds.map(x -> {
String[] s = x.split(" ");
return new T(s[0], Integer.parseInt(s[1]));
});
WatermarkStrategy<T> wms = WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner((t, l) -> {
LocalDateTime now = LocalDateTime.now().minusSeconds(10);
Date from = Date.from(now.atZone(ZoneId.systemDefault()).toInstant());
return from.getTime();
});
map.assignTimestampsAndWatermarks(wms)
.keyBy(T::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<T, List<T>, String, TimeWindow>() {
@Override
public void process(String s,
ProcessWindowFunction<T, List<T>, String, TimeWindow>.Context context,
Iterable<T> elements, Collector<List<T>> out) throws Exception {
System.out.println(context.window().getStart() + "----" + context.window().getEnd() + "----" + context.window().maxTimestamp());
List<T> ts = new ArrayList<>();
for (T element : elements) {
ts.add(element);
}
out.collect(ts);
}
}).print();
env.execute();
}
}
字符串
测试结果
1702991405520 1702991406914 1702991408207 1702991409720 1702991411758 1702991413702 1702991414803 1702991418426 1702991419629 1702991420617 1702991421944 1702991423171 1702991424718 1702991426117 1702991427736 1702991429248 1702991405000-1702991410000-170299140999
窗口为1702991405000-1702991410000,但为什么当水印到达1702991429248时窗口关闭并调用处理方法?
请帮助我理解
1条答案
按热度按时间vulvrdjw1#
从你分享的内容中,无法看到1702991429248来自哪里,但这个值不是触发窗口的水印。