滑动时间窗口的flink侧输出

a9wyjsp7  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(283)

我有下面的flink管道,它简单地计算窗口中的元素,并在一个单独的流中报告最新的元素

OutputTag<Tuple3<Long, String, Double>> lateItems= new OutputTag<Tuple3<Long, String, Double>>("Late Items"){};
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.setParallelism(1);
env.getConfig().setAutoWatermarkInterval(-1);
DataStream<Tuple3<Long,String,Double>> stream = env.addSource(new  YetAnotherSource(fileName));
DataStream<Tuple3<Long,String,Double>> lateStream;
AllWindowedStream<Tuple3<Long, String, Double>, TimeWindow> tuple3TimeWindowAllWindowedStream = stream.windowAll(SlidingEventTimeWindows.of(Time.milliseconds(100), Time.milliseconds(10)));
tuple3TimeWindowAllWindowedStream.sideOutputLateData(lateItems);
lateStream = streamOfResults.getSideOutput(lateItems);
            lateStream.countWindowAll(1).apply(new CounterFunction22()).writeAsText("FlinkSlidingTimeWindowLateItemsResult.txt",FileSystem.WriteMode.OVERWRITE);
streamOfResults.writeAsText("FlinkSlidingTimeWindowOutputFor" + fileName + ".txt", FileSystem.WriteMode.OVERWRITE);

当我传递以下数据作为输入时

1383451269002,A,22
1383451269006,A,18
1383451269007,A,18

* 1383451269010,W,0

1383451269008,A,18
1383451269027,A,20
1383451269028,A,19
1383451269033,A,17
1383451269033,A,17
1383451269030,A,17

* 1383451269038,W,0

1383451269008,A,17

带*的元素是水印。
我得到了预期的结果,第一个窗口包含三个元素。这是因为第五行和最后一行的元素被认为是该窗口的延迟元素。

(1383451268910,1383451269010,3)

但是,在输出端没有生成任何内容。
但是,当我使用会话窗口时,会在输出端生成延迟项。
你知道为什么滑动时间窗什么都不产生吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题