如何在flink中实现一个触发器,该触发器缓冲到超时并在超时结束时触发?

fwzugrvs  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(400)

如何在flink中实现一个触发器,该触发器缓冲到超时并在超时结束时触发?
如果窗口中至少有一个元素,我希望注册触发器,然后缓冲到一秒钟,并在一秒钟后触发。如果窗口中没有任何元素,那么触发器不会自动注册,所以我不希望看到任何输出。
我不想触发产生大量的交通每一秒,无论元素是否存在于窗口。另一方面说只有一个元素在窗口我不想它坐在那里,直到水印或永远。相反,我希望有一个超时,这样我至少可以在一秒钟后看到一个元素。
ProcessingTimeTrigger.create() 你这样做?如果是的话,它们之间有什么不同 ProcessingTimeTrigger.create()CountinousProcessingTimeTrigger ?

u5rb5r59

u5rb5r591#

正常的一秒钟处理时间窗口将为您提供一个包含一秒钟内发生的所有事件的窗口,对于其中至少有一个事件的任何一秒钟。但是这个窗口不会与第一个事件对齐;它将与一天中的时间时钟对齐。因此,例如,如果窗口中的第一个事件发生在给定的第二个事件的一半,那么该窗口将只包括第一个事件之后500毫秒的事件。
ProcessingTimeTrigger 在Windows的尽头开火一次。一 CountinousProcessingTimeTrigger 以一定的速度反复开火。
为了精确地获得您所描述的语义,您需要一个定制触发器。您可以执行类似于onesecondintervaltrigger示例的操作,只是希望从使用事件时间切换到处理时间,并且只触发一次,而不是重复触发。
会给你留下这样的东西:

public static class OneSecondIntervalTrigger extends Trigger<SensorReading, TimeWindow> {

    @Override
    public TriggerResult onElement(SensorReading r, long ts, TimeWindow w, TriggerContext ctx) throws Exception {
        // firstSeen will be false if not set yet
        ValueState<Boolean> firstSeen = ctx.getPartitionedState(
            new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));

        // register initial timer only for first element
        if (firstSeen.value() == null) {
            // FIRE the window 1000 msec after the first event
            long now = ctx.getCurrentProcessingTime();
            ctx.registerProcessingTimeTimer(now + 1000);
            fireSeen.update(true);
        }
        // Continue. Do not evaluate window now
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
        // Continue. We don't use event time timers
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
        // Evaluate the window now
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public void clear(TimeWindow w, TriggerContext ctx) throws Exception {
        // Clear trigger state
        ValueState<Boolean> firstSeen = ctx.getPartitionedState(
            new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
        firstSeen.clear();
    }
}

相关问题