apache-flink-实现一个状态可能非常大的流处理器

bz4sfanl  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(345)

我希望从一系列事件中投射出一个潜在的非常大的状态。这就是我如何以一种强制性的方式实现这一点:

class ImperativeFooProcessor {

  val state: mutable.Map[UUID, BarState] = mutable.HashMap.empty[UUID, BarState]

  def handle(event: InputEvent) = {
    event match {
      case FooAdded(fooId, barId) => {
        // retrieve relevant state and do some work on it
        val barState = state(barId)

        // let the world know about what may have happened
        publish(BarOccured(fooId, barId))
        // or maybe rather
        publish(BazOccured(fooId, barId))
      }
      case FooRemoved(fooId, barId) => {
        // retrieve relevant state and do some work on it
        val barState = state(barId)

        // let the world know about what may have happened
        publish(BarOccured(fooId, barId))
        // or maybe rather
        publish(BazOccured(fooId, barId))
      }
    }
  }

  private def publish(event: OutputEvent): Unit = {
    // push event to downstream sink
  }
}

在最坏的情况下,barstate的大小将随着它被用户提及的次数而增长 FooAdded 相对于每个barid的事件总数,唯一barid的数量非常少。
我将如何开始在flink中表示这个处理结构?
如何处理每个巴态都可能变得非常大的事实?

kyks70gy

kyks70gy1#

flink在所谓的状态后端中维护状态。存在状态后端( MemoryStateBackend 以及 FsStateBackend )在工作进程的jvm堆上操作的。这些后端不适合处理大型状态。
flink还有一个基于rocksdb的RockSDBStateBend。rocksdb用作每个工作节点上的本地数据库(无需将其设置为外部服务),并将状态数据写入磁盘。因此,它可以处理超过内存的非常大的状态。
Flink提供了一个 KeyedStream 它是一个按特定属性划分的流。在您的例子中,您可能希望对同一id的所有访问都转到同一状态示例,因此您可以使用 barId 作为一把钥匙。然后,状态将基于 barId . 这基本上是一个分布式键值存储或Map。因此,您不需要将状态表示为Map,因为它是由flink自动分发的。

相关问题