如何清除flink流状态的非活动键?

z2acfund  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(503)

我的目标是有一个flink流程序,它保存最后n个id,其中id是从事件中提取的。接收器是一个cassandra存储,因此可以随时获取id列表。重要的是,Cassandra是立即更新后,每一个事件。
这可以通过 mapWithState (见下面的代码)。但是,这个代码有一个重要的问题。国家的关键是 userid . 有些用户可能会处于活动状态一段时间,然后就再也不会了。我担心的是,状态存储将永远增长。
如何清除非活动键的状态?

case class MyEvent(userId: Int, id: String)

env
  .addSource(new FlinkKafkaConsumer010[MyEvent]("vips", new MyJsonDeserializationSchema(), kafkaConsumerProperties))
  .keyBy(_.userId)
  .mapWithState[(Int, Seq[String]), Seq[String]] { (in: MyEvent, currentIds: Option[Seq[String]]) =>
    val keepNIds = currentIds match {
      case None => Seq(in.id)
      case Some(cids) => (cids :+ in.id).takeRight(100)
    }
    ((in.userId, keepNIds), Some(keepNIds))
  }
  .addSink { in: (Int, Seq[String]) =>
    CassandraSink.appDatabase.idsTable.store(...)
  }
xdyibdwo

xdyibdwo1#

生长状态是一个重要而正确的观察。如果您的键空间正在移动,则肯定会发生这种情况。
flink 1.2.0增加了 ProcessFunction 解决了这个问题。一 ProcessFunction 类似于 FlatMapFunction 但可以使用计时器服务。您可以注册调用 onTimer() 当它们过期时回调函数。回调可以用来清除状态。

相关问题