检查点故障恢复期间richfunctions的生命周期是什么?

5ktev3wc  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(230)

我想知道,如果 RichFunction ,Flink会打电话来 close 对于异常之后的每个运算符,即使异常是在运算符的 open 函数,如果flink会调用 open 再次尝试从检查点恢复作业。
对于context,我有一个filter操作符,每个worker jvm应该有一个类的示例,所以我使用一个静态原子引用

private static final AtomicReference<MyClass> SINGLETON = new AtomicReference<>(null);

@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    SINGLETON.compareAndSet(null, new MyClass());
    SINGLETON.get().open(); // synchronized
}

@Override
public boolean filter(JsonNode value) {
    ...
    SINGLETON.get().doSomething();
    ...
}

@Override
public void close() throws Exception {
    MyClass singleton = SINGLETON.getAndSet(null);
    if (singleton != null) singleton.close();
}

然而,我见过这样的系统 SINGLETON.get().doSomething() 最后扔了一个 NullPointerException . 日志中唯一突出的一点是,作业最初失败并重新启动,因为消息总线没有准备好(filter操作符本身没有抛出任何异常),我不明白为什么singleton有时会变成null。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题