我正在使用flink 1.2-snapshot。我的数据如下所示:
id=25398102,源id=1,ts=2016-10-15 00:00:56,用户=14,值=919
id=25398185,源id=1,ts=2016-10-15 00:01:06,用户=14,值=920
id=25398210,源id=1,ts=2016-10-15 00:01:16,用户=14,值=944
id=25398235,源id=1,ts=2016-10-15 00:01:24,用户=3149,值=944
id=25398236,源id=1,ts=2016-10-15 00:01:25,用户=71,值=955
id=25398239,源id=1,ts=2016-10-15 00:01:26,用户=71,值=955
id=25398265,源id=1,ts=2016-10-15 00:01:36,用户=71,值=955
id=25398310,源id=1,ts=2016-10-15 00:02:16,用户=14,值=960
id=25398320,源id=1,ts=2016-10-15 00:02:26,用户=14,值=1000
我正在运行以下代码来创建基于windows的用户ID:
stream.flatMap(new LogsParser())
.assignTimestampsAndWatermarks(new MessageTimestampExtractor())
.keyBy("sourceId")
.window(GlobalWindows.create())
.trigger(PurgingTrigger.of(new MySessionTrigger()))
.apply(new SessionWindowFunction())
.print();
mysession trigger查看接收到的事件并检查用户id以触发用户id更改窗口。sessionwindow函数只是在窗口外创建一个会话。
以下是创建的会话:
会议:
id=25398102,源id=1,ts=2016-10-15 00:00:56,用户=14,值=919
id=25398185,源id=1,ts=2016-10-15 00:01:06,用户=14,值=920
id=25398210,源id=1,ts=2016-10-15 00:01:16,用户=14,值=944
id=25398235,源id=1,ts=2016-10-15 00:01:24,用户=3149,值=944
会议:
id=25398236,源id=1,ts=2016-10-15 00:01:25,用户=71,值=955
id=25398239,源id=1,ts=2016-10-15 00:01:26,用户=71,值=955
id=25398265,源id=1,ts=2016-10-15 00:01:36,用户=71,值=955
id=25398310,源id=1,ts=2016-10-15 00:02:16,用户=14,值=960
会议:
id=25398320,源id=1,ts=2016-10-15 00:02:26,用户=14,值=1000
您可以看到的问题是,在每个会话中,最后一个事件实际上属于下一个窗口。由于最后一个事件已经在窗口中,所以触发窗口的决定有些晚了。
如果不考虑窗口中的最后一个事件,如何触发该窗口?
1条答案
按热度按时间sd2nnvve1#
一个想法是使用flatmap在用户id改变时向流中插入标记。然后,触发器函数可以在看到这些标记之一时触发,会话窗口函数可以过滤掉这些标记。