我的flink计划如下:
object WindowedWordCount {
class MyCoMap extends RichCoMapFunction[String, String, String] {
private var usersMap: MapState[String, String] = _
override def open(parameters: Configuration): Unit = {
usersMap = getRuntimeContext.getState(
//Error here
new MapStateDescriptor[String, String]("usersMap", TypeInformation[String], TypeInformation[String)
)}
override def map1(value: String): String = {
//Grab userId and timeStampOfUserId here
usersMap.put(userId,timeStampOfUserId)
println(usersMap)
"Adding user from event stream to global map: " + timeStampOfUserId
}
override def map2(value: String): String = {
//Do something with the second stream
}
}
}
我在上面代码中遇到的问题是 map1
函数内部 MyCoMap
当我添加 key-value
给我的 users HashMap
全球定义,出于某种原因 key-value
我补充说,它覆盖了前面的值。所以我的Map上总是有1 key-value
在里面。。。
例如,如果它添加 users.put("user1", "")
Map将包含 {"user1"=""}
然后再放一个
key-value users.put("user2", "")
Map将变为 {"test2"=""}
而不是 {"test1"="", "test2"=""}
.
编辑:
是的,我想加入userid。stream1每秒传入一条消息,而stream2每秒传入一条消息。我想将stream1中的userid添加到 MapState
(在 map1
函数),然后在 map2
函数我想检查 userID
来自stream2的 MapSate
.
我在这个领域缺乏知识,但是当你说keyed state时,我假设所有相同的密钥都将在同一个节点上正确处理,并且每个节点都有自己的密钥 MapState
反对?因为我主要关心的是,当我加入来自stream2的userid时,我想确保我收集了来自stream1的所有userid,并且在 MapState
在map2函数中发生。
1条答案
按热度按时间syqv5f0l1#
如果我没弄错你的要求,你想在同一条线上加入两条流
user
.这可以通过在
user
字段:这个
CoMapFunction
应该使用keyed state,即,在与您相同的属性上设置关键帧的状态keyBy
on(在您的案例中为用户)。所以键控状态本质上是一个分片的、分布式的HashMap
. 请注意,键控状态只能在本地访问。在使用状态时,必须确保删除所有不再需要的状态。否则,你的州在某个时候可能会变得太大。