flink获取runtimecontext

x9ybnkn6  于 2021-06-24  发布在  Flink
关注(0)|答案(2)|浏览(981)

在我的flink代码中,我使用的是一种自定义输入格式,它引发了一个异常。看来我需要一个 RuntimeContext ,但是我怎么能得到一个呢?
我的格式类如下所示:

MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T]{
@transient var lineCounter: IntCounter = _
override def open(split: FileInputSplit): Unit = {
    super.open(split)
    lineCounter = new IntCounter()
    getRuntimeContext.addAccumulator("rowsInFile", lineCounter) // this line throws IllegalStateException

我的主程序如下所示:

val env = ExecutionEnvironment.getExecutionEnvironment
val format = new MyInputFormat
env.readFile(format, inputFile.getAbsolutePath) // throws exception

引发的异常:

java.lang.IllegalStateException: The runtime context has not been initialized yet. Try accessing it in one of the other life cycle methods.
    at org.apache.flink.api.common.io.RichInputFormat.getRuntimeContext(RichInputFormat.java:51)

我的班级需要一个 RuntimeContext 因为它延伸了 DelimitedInputFormat 延伸到。。。 RichInputFormat ```
public abstract class DelimitedInputFormat extends FileInputFormat
public abstract class FileInputFormat extends RichInputFormat<OT, FileInputSplit>
public abstract class RichInputFormat<OT, T extends InputSplit> implements InputFormat<OT, T>
private transient RuntimeContext runtimeContext;
public void setRuntimeContext(RuntimeContext t)
public RuntimeContext getRuntimeContext()

所以任何 `RichInputFormat` 希望我们 `setRuntimeContext(RuntimeContext t)` 在它被创建之后。
我想我应该做以下工作:

val env = ExecutionEnvironment.getExecutionEnvironment
val runtimeContext: RuntimeContext = ??? // How do I get this?
val format = new MyInputFormat
format.setRuntimeContext(runtimeContext)
env.readFile(format, inputFile.getAbsolutePath) // no longer throws exception

但是如何获取runtimecontext的示例呢?由于我的自定义输入格式没有 `RuntimeContext` . 我想定一个,但我不知道从哪儿弄到。
qyswt5oh

qyswt5oh1#

我还不明白为什么,但看起来 MyInputFormat 多次示例化,包括 RuntimeContext 是可用的。然而,尽管如此,这个作业仍然工作并计算它需要做什么。我已经解决了这个问题,将所有对 addAccumulator(,) 在一个 try ,就像这样:

private def addAccumulator(accName: String, acc: SimpleAccumulator[_]): Unit = {
    try {
      val rc = getRuntimeContext.getAccumulator(accName) // throws if RuntimeContext not yet set
      if (rc == null) getRuntimeContext.addAccumulator(accName, acc)
    } catch {
      case NonFatal(_) =>
    }
  }

尽管我打电话来,我还是要这么做 addAccumulator(,) 内部 open() ,这似乎是正确的生命周期方法。另外:由于并行性,几个子作业试图添加相同的累加器,这是错误的。这就是为什么我要先得到累加器。如果还没有上下文,没问题:我稍后会得到一个。如果累加器已经存在,没问题-没什么可做的。这只是一个解决办法,不是一个解决方案-但这就是我现在所拥有的。

zzoitvuj

zzoitvuj2#

您应该在生命周期方法中初始化runtimecontext,如 open ```
MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T] {

override def openInputFormat() = {
getRuntimeContext.addAccumulator("rowsInFile", lineCounter)
}

相关问题