我想对flink中的输入数据流应用processfunction(),用一个缓存对象处理每个传入元素。我的代码如下所示:
object myJob extends FlinkJob {
private val myCache = InMemoryCache()
private def updateCache(myCache,someValue) : Boolean = {//some code}
private def getValue(myCache,someKey) : Boolean = {//some code}
def run(params, executionEnv) : Unit = {
val myStream = executionEnv.getStream()
val processedStream = myStream.process(new ProcessFunction {
def processElement(value,context,collector) : Unit = {
//Update cache
//Collect updated event
}
}
processedStream.write()
}
}
当我并行化这个作业时,我假设作业的每个并行示例都有自己的cacheobject,因此,在多个cacheobject中可能存在一个cache键。但是,我希望对于一个特定的键有一个单独的缓存条目,也就是说,与一个特定键对应的所有记录都必须由一个示例和一个cacheobject处理。在mystream上使用keyby()是否可以确保所有传入事件(具有相同的键)都由flink作业的单个并行任务/示例处理,因此也由单个cacheobject处理?
2条答案
按热度按时间ff29svar1#
是的,keyby保证具有相同密钥的每个事件都将由同一个操作符示例处理。这对于高吞吐量、低延迟的有状态流处理至关重要。
这使得Flink所在的州成为地方性的,这使得工作更容易,速度更快。计时器也利用了这个键控分区。
使用flink的键控状态可能比使用缓存对象效果更好。
xwmevbvl2#
而不是一个对象,我认为你应该使用一个状态。
具有相同键的所有事件将访问相同的状态,从而访问相同的值。修改其中一个状态不会影响其他已设置关键帧的状态。