我目前实现了一个自定义触发器,它应该在处理maxElements
或timeoutMs
过期时触发。目前,触发器的maxElements部分工作正常,并按预期触发,但从未触发超时。
在Flink主任务中,我也尝试过env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
,但这并没有帮助。
有人能指引我走向正确的方向吗?我迷路了,不知道该往哪里走。下面是我的实现:
public class ElementCountOrTimeoutTrigger<W extends Window> extends Trigger<Object, W> {
private final long maxElements;
private final long timeoutMs;
private long elementCount = 0;
private long lastTimestamp = Long.MIN_VALUE;
public ElementCountOrTimeoutTrigger(long maxElements, long timeoutMs) {
this.maxElements = maxElements;
this.timeoutMs = timeoutMs;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
elementCount++;
lastTimestamp = timestamp;
if (elementCount >= maxElements) {
elementCount = 0;
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
System.out.println("On processing time called");
System.out.println("Time: " + time);
System.out.println("Last timestamp: " + lastTimestamp);
System.out.println("Timeout: " + timeoutMs);
if (time >= lastTimestamp + timeoutMs) {
elementCount = 0;
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
elementCount = 0;
lastTimestamp = Long.MIN_VALUE;
}
public static <W extends Window> ElementCountOrTimeoutTrigger<W> of(long maxElements, long timeoutMs) {
return new ElementCountOrTimeoutTrigger<>(maxElements, timeoutMs);
}
}
编辑:
合并了一个计时器,但它不遵守删除计时器调用:
public class ElementCountOrTimeTrigger<W extends Window> extends Trigger<Object, W> {
private final long maxElements;
private final long timeoutMs;
private int elementCount = Integer.MIN_VALUE;
private long lastTimestamp = Long.MIN_VALUE;
private long lastTimerExpire = Long.MIN_VALUE;
public ElementCountOrTimeTrigger(long maxElements, long timeoutMs) {
this.maxElements = maxElements;
this.timeoutMs = timeoutMs;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
elementCount++;
lastTimestamp = ctx.getCurrentProcessingTime();
if (lastTimerExpire > lastTimestamp) {
ctx.deleteProcessingTimeTimer(lastTimerExpire);
System.out.println("Removed the timer due to new element for time: " + lastTimerExpire);
}
lastTimerExpire = lastTimestamp + timeoutMs;
ctx.registerProcessingTimeTimer(lastTimerExpire);
System.out.println("Registered timer until " + lastTimerExpire);
if (elementCount >= maxElements) {
elementCount = 0;
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
System.out.println("Processing time called for time: " + time + " and last timer expire: " + lastTimerExpire);
System.out.println("Firing and purging");
elementCount = Integer.MIN_VALUE;
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
elementCount = Integer.MIN_VALUE;
ctx.deleteProcessingTimeTimer(lastTimerExpire);
}
public static <W extends Window> ElementCountOrTimeTrigger<W> of(long maxElements, long timeoutMs) {
return new ElementCountOrTimeTrigger<>(maxElements, timeoutMs);
}
}
它产生的日志:
Registered timer until 1683405995456
Removed the timer due to new element for time: 1683405995456
Registered timer until 1683405997053
Removed the timer due to new element for time: 1683405997053
Registered timer until 1683405999054
Removed the timer due to new element for time: 1683405999054
Registered timer until 1683406001054
Removed the timer due to new element for time: 1683406001054
Registered timer until 1683406003054
Removed the timer due to new element for time: 1683406003054
Registered timer until 1683406005055
Processing time called for time: 1683405995456 and last timer expire: 1683406005055
Firing and purging
Processing time called for time: 1683405997053 and last timer expire: 1683406005055
Firing and purging
Removed the timer due to new element for time: 1683406005055
Registered timer until 1683406007055
Removed the timer due to new element for time: 1683406007055
Registered timer until 1683406009055
Processing time called for time: 1683405999054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406001054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406003054 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406005055 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406007055 and last timer expire: 1683406009055
Firing and purging
Processing time called for time: 1683406009055 and last timer expire: 1683406009055
Firing and purging
1条答案
按热度按时间zf2sa74q1#
您还没有注册处理时间计时器,因此永远不会调用
onProcessingTime
。您可以在内置的
ProcessingTimeTrigger
上对自定义触发器的超时部分进行建模,您可以在这里找到:https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java另请参阅这个答案,它提供了一个非常相似的完整示例:https://stackoverflow.com/a/49895802/2000823