scala—每次我在hashmap中放入键值时,hashmap中的值都会被重写

qfe3c7zg  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(367)

我的flink计划如下:

object WindowedWordCount {
    class MyCoMap extends RichCoMapFunction[String, String, String] {

      private var usersMap: MapState[String, String] = _

      override def open(parameters: Configuration): Unit = {
          usersMap = getRuntimeContext.getState(
          //Error here
          new MapStateDescriptor[String, String]("usersMap", TypeInformation[String], TypeInformation[String)
      )}

      override def map1(value: String): String = {
        //Grab userId and timeStampOfUserId here
        usersMap.put(userId,timeStampOfUserId)
        println(usersMap)
        "Adding user from event stream to global map: " + timeStampOfUserId
      }

      override def map2(value: String): String = {
          //Do something with the second stream
      }
    }
}

我在上面代码中遇到的问题是 map1 函数内部 MyCoMap 当我添加 key-value 给我的 users HashMap 全球定义,出于某种原因 key-value 我补充说,它覆盖了前面的值。所以我的Map上总是有1 key-value 在里面。。。
例如,如果它添加 users.put("user1", "") Map将包含 {"user1"=""} 然后再放一个
key-value users.put("user2", "") Map将变为 {"test2"=""} 而不是 {"test1"="", "test2"=""} .
编辑:
是的,我想加入userid。stream1每秒传入一条消息,而stream2每秒传入一条消息。我想将stream1中的userid添加到 MapState (在 map1 函数),然后在 map2 函数我想检查 userID 来自stream2的 MapSate .
我在这个领域缺乏知识,但是当你说keyed state时,我假设所有相同的密钥都将在同一个节点上正确处理,并且每个节点都有自己的密钥 MapState 反对?因为我主要关心的是,当我加入来自stream2的userid时,我想确保我收集了来自stream1的所有userid,并且在 MapState 在map2函数中发生。

syqv5f0l

syqv5f0l1#

如果我没弄错你的要求,你想在同一条线上加入两条流 user .
这可以通过在 user 字段:

val stream1: DataStream[(String, String)] = ??? // first field is user
val stream2: DataStream[(String, String)] = ??? // first field is user

val connected: DataStream[X] = stream1
  .connect(stream2) // connect both streams
  .keyBy(_._1, _._1) // key both streams on first field
  .map(new YourCoMapFunction())

这个 CoMapFunction 应该使用keyed state,即,在与您相同的属性上设置关键帧的状态 keyBy on(在您的案例中为用户)。所以键控状态本质上是一个分片的、分布式的 HashMap . 请注意,键控状态只能在本地访问。在使用状态时,必须确保删除所有不再需要的状态。否则,你的州在某个时候可能会变得太大。

相关问题