我想知道,如果 RichFunction
,Flink会打电话来 close
对于异常之后的每个运算符,即使异常是在运算符的 open
函数,如果flink会调用 open
再次尝试从检查点恢复作业。
对于context,我有一个filter操作符,每个worker jvm应该有一个类的示例,所以我使用一个静态原子引用
private static final AtomicReference<MyClass> SINGLETON = new AtomicReference<>(null);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
SINGLETON.compareAndSet(null, new MyClass());
SINGLETON.get().open(); // synchronized
}
@Override
public boolean filter(JsonNode value) {
...
SINGLETON.get().doSomething();
...
}
@Override
public void close() throws Exception {
MyClass singleton = SINGLETON.getAndSet(null);
if (singleton != null) singleton.close();
}
然而,我见过这样的系统 SINGLETON.get().doSomething()
最后扔了一个 NullPointerException
. 日志中唯一突出的一点是,作业最初失败并重新启动,因为消息总线没有准备好(filter操作符本身没有抛出任何异常),我不明白为什么singleton有时会变成null。
暂无答案!
目前还没有任何答案,快来回答吧!