我在努力理解flink触发器是如何工作的。我的数据流包含具有sessionid的事件,我根据sessionid聚合了sessionid。每个会话将包含一个已启动和一个已结束的事件,但有时已结束的事件将丢失。
为了处理这个问题,我设置了一个触发器,每当处理结束的事件时,它都会发出聚合会话。但是,如果在2分钟内没有任何事件从该会话到达,我希望发出我们迄今为止聚合的任何消息(我们发送事件的应用程序每分钟发送一次心跳,因此如果我们没有收到任何事件,该会话将被视为丢失)。
我设置了以下触发函数:
public class EventTimeProcessingTimeTrigger extends Trigger<HashMap, TimeWindow> {
private final long sessionTimeout;
private long lastSetTimer;
// Max session length set to 1 day
public static final long MAX_SESSION_LENGTH = 1000l * 86400l;
// End session events
private static ImmutableSet<String> endSession = ImmutableSet.<String>builder()
.add("Playback.Aborted")
.add("Playback.Completed")
.add("Playback.Error")
.add("Playback.StartAirplay")
.add("Playback.StartCasting")
.build();
public EventTimeProcessingTimeTrigger(long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
@Override
public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
lastSetTimer = ctx.getCurrentProcessingTime() + sessionTimeout;
ctx.registerProcessingTimeTimer(lastSetTimer);
if(endSession.contains(element.get(Field.EVENT_TYPE))) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return time == window.maxTimestamp() ?
TriggerResult.FIRE_AND_PURGE :
TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(lastSetTimer);
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + sessionTimeout);
}
}
为了设置事件的水印,我使用应用程序设置的水印,因为appeventtime可能与服务器上的wallclock不同。我提取如下水印:
DataStream<HashMap> playerEvents = env
.addSource(kafkaConsumerEvents, "playerEvents(Kafka)")
.name("Read player events from Kafka")
.uid("Read player events from Kafka")
.map(json -> DECODER.decode(json, TypeToken.of(HashMap.class))).returns(HashMap.class)
.name("Map Json to HashMap")
.uid("Map Json to HashMap")
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<HashMap>(org.apache.flink.streaming.api.windowing.time.Time.seconds(30))
{
@Override
public long extractTimestamp(HashMap element)
{
long timestamp = 0L;
Object timestampAsObject = (Object) element.get("CanonicalTime");
timestamp = (long)timestampAsObject;
return timestamp;
}
})
.name("Add CanonicalTime as timestamp")
.uid("Add CanonicalTime as timestamp");
现在我发现奇怪的是,当我在debug中运行代码并在触发器的clear函数中设置断点时,它会不断被调用。即使在触发器中没有达到火和净化点。所以我觉得我完全误解了触发器的工作原理。我的实现根本没有做我认为它在做的事情。
我想我的问题是,什么时候该被触发?这是实现eventtimetrigger和processingtimetrigger组合的正确方法吗?
感谢所有能给我的帮助。
更新1:(2020-05-29)
以便提供有关如何设置的更多信息。我将我的环境设置如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(60, Time.of(60, TimeUnit.MINUTES), Time.of(60, TimeUnit.SECONDS)));
env.enableCheckpointing(5000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
所以我对整个流使用eventtime。然后我创建如下窗口:
DataStream<PlayerSession> playerSessions = sideEvents
.keyBy((KeySelector<HashMap, String>) event -> (String) event.get(Field.SESSION_ID))
.window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
.trigger(new EventTimeProcessingTimeTrigger(SESSION_TIMEOUT))
.aggregate(new SessionAggregator())
.name("Aggregate events into sessions")
.uid("Aggregate events into sessions");
1条答案
按热度按时间qpgpyjmq1#
这种情况很复杂。我不太确定这段代码到底会做什么,但我可以解释其中的一些情况。
要点1:您已经将时间特性设置为事件时间,安排了时间戳和水印,并实现了
onEventTime
扣动扳机。但是没有地方可以创建事件时间计时器。除非我漏掉了什么,否则实际上没有任何东西使用事件时间或水印。你还没有实现一个事件时间触发器,我也没料到onEventTime
将永远被召唤。第二点:你的触发器不需要清除。作为清除窗口的一部分,flink负责调用clear on触发器。
第三点:你的扳机试图反复发射和吹扫Windows,这似乎不对。我这样说是因为您正在为每个元素创建一个新的处理时间计时器,当每个计时器启动时,您正在启动并清除窗口。你可以随时打开Windows,但你只能吹一次,吹完就不见了。
要点4:会话窗口是一种特殊的窗口,称为合并窗口。当会话合并时(事件到达时总是发生),它们的触发器被合并,其中一个被清除。这就是为什么你经常被打电话给clear。
建议:由于你每分钟有一次保持活动,并且打算在2分钟不活动后关闭会话,因此似乎可以将会话间隔设置为2分钟,这样可以避免相当多的事情变得如此复杂。让会话窗口执行它们设计的任务。
假设这是可行的,那么你可以简单地扩展flink的
ProcessingTimeTrigger
并覆盖其onElement
方法:以这种方式,窗口将在两分钟不活动后触发,或者由显式会话结束事件触发。
你应该可以简单地继承
ProcessingTimeTrigger
的行为。如果要使用事件时间,请使用
EventTimeTrigger
作为超类,您必须找到一种方法来确保即使流处于空闲状态,您的水印也能取得进展。看看这个答案如何处理。