Flink自定义触发器未触发onProcessingTime

zdwk9cvp  于 2023-05-12  发布在  Apache
关注(0)|答案(1)|浏览(230)

我目前实现了一个自定义触发器,它应该在处理maxElementstimeoutMs过期时触发。目前,触发器的maxElements部分工作正常,并按预期触发,但从未触发超时。
在Flink主任务中,我也尝试过env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);,但这并没有帮助。
有人能指引我走向正确的方向吗?我迷路了,不知道该往哪里走。下面是我的实现:

public class ElementCountOrTimeoutTrigger<W extends Window> extends Trigger<Object, W> {

    private final long maxElements;
    private final long timeoutMs;
    private long elementCount = 0;
    private long lastTimestamp = Long.MIN_VALUE;

    public ElementCountOrTimeoutTrigger(long maxElements, long timeoutMs) {
        this.maxElements = maxElements;
        this.timeoutMs = timeoutMs;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        elementCount++;
        lastTimestamp = timestamp;
        if (elementCount >= maxElements) {
            elementCount = 0;
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        System.out.println("On processing time called");
        System.out.println("Time: " + time);
        System.out.println("Last timestamp: " + lastTimestamp);
        System.out.println("Timeout: " + timeoutMs);
        if (time >= lastTimestamp + timeoutMs) {
            elementCount = 0;
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        elementCount = 0;
        lastTimestamp = Long.MIN_VALUE;
    }

    public static <W extends Window> ElementCountOrTimeoutTrigger<W> of(long maxElements, long timeoutMs) {
        return new ElementCountOrTimeoutTrigger<>(maxElements, timeoutMs);
    }
}

编辑:
合并了一个计时器,但它不遵守删除计时器调用:

public class ElementCountOrTimeTrigger<W extends Window> extends Trigger<Object, W> {

    private final long maxElements;
    private final long timeoutMs;
    private int elementCount = Integer.MIN_VALUE;
    private long lastTimestamp = Long.MIN_VALUE;
    private long lastTimerExpire = Long.MIN_VALUE;

    public ElementCountOrTimeTrigger(long maxElements, long timeoutMs) {
        this.maxElements = maxElements;
        this.timeoutMs = timeoutMs;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        elementCount++;
        lastTimestamp = ctx.getCurrentProcessingTime();
        if (lastTimerExpire > lastTimestamp) {
            ctx.deleteProcessingTimeTimer(lastTimerExpire);
            System.out.println("Removed the timer due to new element for time: " + lastTimerExpire);
        }
        lastTimerExpire = lastTimestamp + timeoutMs;
        ctx.registerProcessingTimeTimer(lastTimerExpire);
        System.out.println("Registered timer until " + lastTimerExpire);
        if (elementCount >= maxElements) {
            elementCount = 0;
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        System.out.println("Processing time called for time: " + time + " and last timer expire: " + lastTimerExpire);
        System.out.println("Firing and purging");
        elementCount = Integer.MIN_VALUE;
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        elementCount = Integer.MIN_VALUE;
        ctx.deleteProcessingTimeTimer(lastTimerExpire);
    }

    public static <W extends Window> ElementCountOrTimeTrigger<W> of(long maxElements, long timeoutMs) {
        return new ElementCountOrTimeTrigger<>(maxElements, timeoutMs);
    }
}

它产生的日志:

Registered timer until 1683405995456
Removed the timer due to new element for time: 1683405995456
Registered timer until 1683405997053
Removed the timer due to new element for time: 1683405997053
Registered timer until 1683405999054
Removed the timer due to new element for time: 1683405999054
Registered timer until 1683406001054
Removed the timer due to new element for time: 1683406001054
Registered timer until 1683406003054
Removed the timer due to new element for time: 1683406003054
Registered timer until 1683406005055
Processing time called for time: 1683405995456 and last timer expire: 1683406005055
Firing and purging
Processing time called for time: 1683405997053 and last timer expire: 1683406005055
Firing and purging
Removed the timer due to new element for time: 1683406005055
Registered timer until 1683406007055
Removed the timer due to new element for time: 1683406007055
Registered timer until 1683406009055
Processing time called for time: 1683405999054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406001054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406003054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406005055 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406007055 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406009055 and last timer expire: 1683406009055
Firing and purging
zf2sa74q

zf2sa74q1#

您还没有注册处理时间计时器,因此永远不会调用onProcessingTime
您可以在内置的ProcessingTimeTrigger上对自定义触发器的超时部分进行建模,您可以在这里找到:https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
另请参阅这个答案,它提供了一个非常相似的完整示例:https://stackoverflow.com/a/49895802/2000823

相关问题