提供意外输出的flink自定义触发器

p4tfgftt  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(320)

我想创建一个 Trigger 它第一次在20秒内发射,之后每5秒发射一次。我用过 GlobalWindows 还有一种习俗 Trigger ```
val windowedStream = valueStream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(TradeTrigger.of())

这是密码 `TradeTrigger` :

@PublicEvolving
public class TradeTrigger extends Trigger<Object, W> {

private static final long serialVersionUID = 1L;

static boolean flag=false;
static long ctime = System.currentTimeMillis();

private TradeTrigger() {
}

@Override
public TriggerResult onElement(
        Object arg0,
        long arg1,
        W arg2,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
        throws Exception {
    // TODO Auto-generated method stub

    if(flag == false){
        if((System.currentTimeMillis()-ctime) >= 20000){
           flag = true;
           ctime = System.currentTimeMillis();
           return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    } else {
        if((System.currentTimeMillis()-ctime) >= 5000){
            ctime = System.currentTimeMillis();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

}

@Override
public TriggerResult onEventTime(
        long arg0,
        W arg1,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
        throws Exception {
    // TODO Auto-generated method stub
    return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(
        long arg0,
        W arg1,
        org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
        throws Exception {
    // TODO Auto-generated method stub
    return TriggerResult.CONTINUE;
}

public static <W extends Window> TradeTrigger<W> of() {
    return new TradeTrigger<>();
}

}

所以基本上,什么时候 `flag` 是 `false` ,即第一次 `Trigger` 应该在20秒内被炒鱿鱼 `flag` 至 `true` . 从下一次开始,它应该每5秒被发射一次。
我面临的问题是,每次 `Trigger` 被解雇了。也就是说,我在20秒后收到一条消息,每5秒收到一条消息。我期望每次触发时输出20条消息。
如果我使用 `.timeWindow(Time.seconds(5))` 创建一个5秒的时间窗口,每5秒输出20条消息。请帮我把代码弄好。有什么我不知道的吗?
gr8qqesn

gr8qqesn1#

在费边和Flink邮件列表的帮助下,它成功了。将状态存储在 ValueState 变量通过 TriggerContext . 签入变量 onEvent() 方法,如果是第一次注册 processingTimeTimer 超过当前时间20秒并更新状态。在 onProcessingTime 方法,注册另一个 ProcessingTimeTimer 比当前时间多5秒,更新状态并触发 Window .

1l5u6lss

1l5u6lss2#

触发器实现有几个问题:
永远不要将函数的状态存储在静态变量中。flink不会隔离JVM中的用户进程。相反,它为每个taskmanager使用一个jvm,并启动多个线程。因此,静态布尔标志在触发器的多个示例之间共享。你应该把Flink的国旗藏起来 ValueState 可从 TriggerContext . flink会注意检查你的状态,并在失败时恢复它。 Trigger.onEvent() 仅在新事件到达时调用。因此它不能用于在特定时间触发窗口计算。相反,您应该注册事件时间计时器或处理时间计时器(再次通过 TriggerContext ). 计时器将调用 Trigger.onEventTime() 或者 Trigger.onProcessingTime() 分别。是使用事件还是处理时间取决于您的用例。

相关问题