我想在apache flink中创建键控窗口,这样每个键的窗口在键的第一个事件到达n分钟后执行。是否可以使用事件时间特性来完成(因为处理时间取决于系统时钟,并且不确定第一个事件何时到达)。如果可能,请解释事件时间和水印的分配,并解释如何在n分钟后调用进程窗口函数。
下面是代码的一部分,可以让您了解我目前的工作:
//Make keyed events so as to start a window for a key
KeyedStream<SourceData, Tuple> keyedEvents =
env.addSource(new MySource(configData),"JSON Source")
.assignTimestampsAndWatermarks(new MyTimeStamps())
.setParallelism(1)
.keyBy("service");
//Start a window for windowTime time
DataStream<ResultData> resultData=
keyedEvents
.timeWindow(Time.minutes(winTime))
.process(new ProcessEventWindow(configData))
.name("Event Collection Window")
.setParallelism(25);
那么,如何分配事件时间和wateramark,使窗口跟随第一个事件的事件时间作为起点,并在10分钟后执行(对于不同的键,第一个事件的开始时间可能不同)。任何帮助都将不胜感激。
/------------ ( window of 10 minutes )
Streams |------------ ( window of 10 minutes )
\------------ ( window of 10 minutes )
edit:用于分配时间戳和水印的类i
public class MyTimeStamps implements AssignerWithPeriodicWatermarks<SourceData> {
@Override
public long extractTimestamp(SourceData element, long previousElementTimestamp) {
//Will return epoch of currentTime
return GlobalUtilities.getCurrentEpoch();
}
@Override
public Watermark getCurrentWatermark() {
// TODO Auto-generated method stub
//Will return epoch of currentTime + 10 minutes
return new Watermark(GlobalUtilities.getTimeShiftNMinutesEpoch(10));
}
}
2条答案
按热度按时间oiopk7p51#
我有一个类似的问题,不久前关于事件时间窗口。这是我的溪流的样子
我的wmassigner类如下所示(注意:这允许1分钟的无序事件发生,如果不想允许延迟,可以扩展不同的时间戳提取器):
我想用于水印的时间戳是data.ts字段。
我的windowprocessor:
有什么不清楚的请告诉我
s4chpxco2#
我认为对于您的用例,最好使用processfunction。您可以在第一个事件出现时注册一个eventtimetimer。比在
onTimer
方法发出结果。比如: