我已经为我的事件流创建了一个自定义触发器和处理函数。
DataStream<DynamoDBRow> dynamoDBRows =
sensorEvents
.keyBy("id")
.window(GlobalWindows.create())
.trigger(new MyCustomTrigger())
.allowedLateness(Time.minutes(1)) # Note
.process(new MyCustomWindowProcessFunction());
我的触发器基于事件参数。一旦接收到事件结束信号,mycustomwindowprocessfunction()将应用于窗口元素。
@Slf4j
public class MyCustomTrigger extends Trigger<SensorEvent, GlobalWindow> {
@Override
public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
if (element.isEventEnd() == true) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}
传感器数据可能很少,甚至在触发后也可能出现。所以我补充说 .allowedLateness(Time.minutes(1))
,以确保在处理时不会遗漏这些事件。
在我看来,允许迟到是行不通的。
翻阅文件后,我发现了这个
如何在全局窗口中包含allowedlateness?
注:我还尝试设置环境时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
更新:20-02-2020
目前正在考虑以下方法(到目前为止没有工作)
@Slf4j
public class JourneyTrigger extends Trigger<SensorEvent, GlobalWindow> {
private final long allowedLatenessMillis;
public JourneyTrigger(Time allowedLateness) {
this.allowedLatenessMillis = allowedLateness.toMilliseconds();
}
@Override
public TriggerResult onElement(SensorEvent element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
if (element.isEventEnd() == true) {
log.info("Timer started with allowedLatenessMillis " + allowedLatenessMillis);
ctx.registerEventTimeTimer(System.currentTimeMillis() + allowedLatenessMillis);
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
log.info("onEvenTime called at "+System.currentTimeMillis() );
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
}
2条答案
按热度按时间rekjcdws1#
最后,我使用下面的自定义触发器实现了我的要求。
同样在
Driver.java
类,设置环境时间特征hivapdat2#
老实说,我不认为有理由使用
GlobalWindow
在这里。你可以用KeyedProcessFunction
这和你的计划是一样的Trigger
,基本上,它将从事件开始到事件结束的所有元素聚集到ListState
当你收到isEventEnd()==true
,您可以简单地安排EventTime
一分钟后启动的计时器,会发出内部收集的结果ListState
.