假设一个例子,输入是一个文件名,我们想使用flink RichFlatMapFunction
来更新文件的状态和输出行(每个文件包含大约10k行)。我想知道我应该在哪里更新状态以确保一次交付。这里有2个解决方案:
// solution 1
class MyOp extends RichFlatMapFunction {
...
def flatMap(filename: String, out: Collector[String]): Unit = {
val state = Option(flinkState.value()).getOrElse(defaultState)
for (line <- read(filename)) {
state.update(line)
flinkState.update(state)
out.collect(line)
}
}
}
// solution 2
class MyOp extends RichFlatMapFunction {
...
def flatMap(filename: String, out: Collector[String]): Unit = {
val state = Option(flinkState.value()).getOrElse(defaultState)
for (line <- read(filename)) {
flinkState.update(state)
out.collect(line)
}
state.update(line)
}
}
1条答案
按热度按时间yebdmbv41#
就正确性而言,这没有任何区别。检查点从来不会在调用用户函数(如
RichFlatMapFunction
)时出现,因此检查点将反映处理传递给flatMap
方法的事件之前或之后的状态。在性能方面,解决方案2要好得多。