在apache flink中,我们应该在每次收集或每次输入时更新状态吗?

omqzjyyz  于 2023-04-10  发布在  Apache
关注(0)|答案(1)|浏览(176)

假设一个例子,输入是一个文件名,我们想使用flink RichFlatMapFunction来更新文件的状态和输出行(每个文件包含大约10k行)。我想知道我应该在哪里更新状态以确保一次交付。这里有2个解决方案:

// solution 1
class MyOp extends RichFlatMapFunction {
 ...
 def flatMap(filename: String, out: Collector[String]): Unit = {
    val state = Option(flinkState.value()).getOrElse(defaultState)
    for (line <- read(filename)) {
      state.update(line)
      flinkState.update(state)
      out.collect(line)
    }
 }
}
// solution 2
class MyOp extends RichFlatMapFunction {
 ...
 def flatMap(filename: String, out: Collector[String]): Unit = {
    val state = Option(flinkState.value()).getOrElse(defaultState)
    for (line <- read(filename)) {
      flinkState.update(state)
      out.collect(line)
    }
    state.update(line)
 }
}
yebdmbv4

yebdmbv41#

就正确性而言,这没有任何区别。检查点从来不会在调用用户函数(如RichFlatMapFunction)时出现,因此检查点将反映处理传递给flatMap方法的事件之前或之后的状态。
在性能方面,解决方案2要好得多。

相关问题