java—如何实现flink事件时间触发器,该触发器在x分钟内未收到任何事件后发出

ukdjmx9f  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(432)

我在努力理解flink触发器是如何工作的。我的数据流包含具有sessionid的事件,我根据sessionid聚合了sessionid。每个会话将包含一个已启动和一个已结束的事件,但有时已结束的事件将丢失。
为了处理这个问题,我设置了一个触发器,每当处理结束的事件时,它都会发出聚合会话。但是,如果在2分钟内没有任何事件从该会话到达,我希望发出我们迄今为止聚合的任何消息(我们发送事件的应用程序每分钟发送一次心跳,因此如果我们没有收到任何事件,该会话将被视为丢失)。
我设置了以下触发函数:

public class EventTimeProcessingTimeTrigger extends Trigger<HashMap, TimeWindow> {
    private final long sessionTimeout;
    private long lastSetTimer;

    // Max session length set to 1 day
    public static final long MAX_SESSION_LENGTH = 1000l * 86400l;

    // End session events
    private static ImmutableSet<String> endSession = ImmutableSet.<String>builder()
            .add("Playback.Aborted")
            .add("Playback.Completed")
            .add("Playback.Error")
            .add("Playback.StartAirplay")
            .add("Playback.StartCasting")
            .build();

    public EventTimeProcessingTimeTrigger(long sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    @Override
    public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        lastSetTimer = ctx.getCurrentProcessingTime() + sessionTimeout;
        ctx.registerProcessingTimeTimer(lastSetTimer);

        if(endSession.contains(element.get(Field.EVENT_TYPE))) {
            return TriggerResult.FIRE_AND_PURGE;
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return time == window.maxTimestamp() ?
                TriggerResult.FIRE_AND_PURGE :
                TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(lastSetTimer);
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
                        OnMergeContext ctx) {
        ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + sessionTimeout);
    }
}

为了设置事件的水印,我使用应用程序设置的水印,因为appeventtime可能与服务器上的wallclock不同。我提取如下水印:

DataStream<HashMap> playerEvents = env
                .addSource(kafkaConsumerEvents, "playerEvents(Kafka)")
                .name("Read player events from Kafka")
                .uid("Read player events from Kafka")
                .map(json -> DECODER.decode(json, TypeToken.of(HashMap.class))).returns(HashMap.class)
                .name("Map Json to HashMap")
                .uid("Map Json to HashMap")
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<HashMap>(org.apache.flink.streaming.api.windowing.time.Time.seconds(30))
                {
                    @Override
                    public long extractTimestamp(HashMap element)
                    {
                        long timestamp = 0L;
                        Object timestampAsObject = (Object) element.get("CanonicalTime");
                        timestamp = (long)timestampAsObject;
                        return timestamp;
                    }
                })
                .name("Add CanonicalTime as timestamp")
                .uid("Add CanonicalTime as timestamp");

现在我发现奇怪的是,当我在debug中运行代码并在触发器的clear函数中设置断点时,它会不断被调用。即使在触发器中没有达到火和净化点。所以我觉得我完全误解了触发器的工作原理。我的实现根本没有做我认为它在做的事情。
我想我的问题是,什么时候该被触发?这是实现eventtimetrigger和processingtimetrigger组合的正确方法吗?
感谢所有能给我的帮助。
更新1:(2020-05-29)
以便提供有关如何设置的更多信息。我将我的环境设置如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.failureRateRestart(60, Time.of(60, TimeUnit.MINUTES), Time.of(60, TimeUnit.SECONDS)));
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

所以我对整个流使用eventtime。然后我创建如下窗口:

DataStream<PlayerSession> playerSessions = sideEvents
                .keyBy((KeySelector<HashMap, String>) event -> (String) event.get(Field.SESSION_ID))
                .window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
                .trigger(new EventTimeProcessingTimeTrigger(SESSION_TIMEOUT))
                .aggregate(new SessionAggregator())
                .name("Aggregate events into sessions")
                .uid("Aggregate events into sessions");
qpgpyjmq

qpgpyjmq1#

这种情况很复杂。我不太确定这段代码到底会做什么,但我可以解释其中的一些情况。
要点1:您已经将时间特性设置为事件时间,安排了时间戳和水印,并实现了 onEventTime 扣动扳机。但是没有地方可以创建事件时间计时器。除非我漏掉了什么,否则实际上没有任何东西使用事件时间或水印。你还没有实现一个事件时间触发器,我也没料到 onEventTime 将永远被召唤。
第二点:你的触发器不需要清除。作为清除窗口的一部分,flink负责调用clear on触发器。
第三点:你的扳机试图反复发射和吹扫Windows,这似乎不对。我这样说是因为您正在为每个元素创建一个新的处理时间计时器,当每个计时器启动时,您正在启动并清除窗口。你可以随时打开Windows,但你只能吹一次,吹完就不见了。
要点4:会话窗口是一种特殊的窗口,称为合并窗口。当会话合并时(事件到达时总是发生),它们的触发器被合并,其中一个被清除。这就是为什么你经常被打电话给clear。
建议:由于你每分钟有一次保持活动,并且打算在2分钟不活动后关闭会话,因此似乎可以将会话间隔设置为2分钟,这样可以避免相当多的事情变得如此复杂。让会话窗口执行它们设计的任务。
假设这是可行的,那么你可以简单地扩展flink的 ProcessingTimeTrigger 并覆盖其 onElement 方法:

@Override
public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {

    if (endSession.contains(element.get(Field.EVENT_TYPE))) {
        return TriggerResult.FIRE_AND_PURGE;
    }

    return super(element, timestamp, window, ctx);
}

以这种方式,窗口将在两分钟不活动后触发,或者由显式会话结束事件触发。
你应该可以简单地继承 ProcessingTimeTrigger 的行为。
如果要使用事件时间,请使用 EventTimeTrigger 作为超类,您必须找到一种方法来确保即使流处于空闲状态,您的水印也能取得进展。看看这个答案如何处理。

相关问题