我有一个flink应用程序用于点击流收集和处理。该应用程序由kafka作为事件源、map函数和sink组成,如下图所示:
我想根据从kafka摄取的原始事件中的userip字段,使用用户的ip位置来丰富传入的click流数据。
csv文件的简化切片,如下所示
start_ip,end_ip,country
"1.1.1.1","100.100.100.100","United States of America"
"100.100.100.101","200.200.200.200","China"
我做了一些研究,发现了一些可能的解决方案:
1.解决方案:广播丰富数据,并通过一些ip匹配逻辑与事件流连接。
1.结果:它适用于一对样本ip位置数据,但不适用于整个csv数据。jvm堆已达到3.5GB,由于广播状态,无法将广播状态放入磁盘(对于rocksdb)
2.解决方案:加载csv数据 open()
中的方法 RichFlatMapFunction
进入状态(valuestate),然后开始事件处理,并在中丰富事件数据 flatMap
方法。
2.结果:由于扩展数据太大,无法存储在jvm堆中,因此无法加载到valuestate。而且,通过valuestate进行反序列化对于键值性质的数据来说也是一种不好的做法。
3.解决方案:为了避免处理jvm堆约束,我尝试用mapstate将扩展数据作为state放入rocksdb(uses disk)。
3.结果:尝试将csv文件加载到中的mapstate open()
方法,给了我一个错误,告诉我不能在中放入mapstate open()
方法,因为我不在 open()
方法:flink键控流密钥为空
4.解决方案:由于mapstate需要keyed上下文(放入rocksdb),在将数据流放入keyedstream后,我尝试在process函数中将整个csv文件加载到本地rocksdb示例(磁盘)中:
class KeyedIpProcess extends KeyedProcessFunction[Long, Event, Event] {
var ipMapState: MapState[String, String] = _
var csvFinishedFlag: ValueState[Boolean] = _
override def processElement(event: Event,
ctx: KeyedProcessFunction[Long, Event, Event]#Context,
out: Collector[Event]): Unit = {
val ipDescriptor = new MapStateDescriptor[String, String]("ipMapState", classOf[String], classOf[String])
val csvFinishedDescriptor = new ValueStateDescriptor[Boolean]("csvFinished", classOf[Boolean])
ipMapState = getRuntimeContext.getMapState(ipDescriptor)
csvFinishedFlag = getRuntimeContext.getState(csvFinishedDescriptor)
if (!csvFinishedFlag.value()) {
val csv = new CSVParser(defaultCSVFormat)
val fileSource = Source.fromFile("/tmp/ip.csv", "UTF-8")
for (row <- fileSource.getLines()) {
val Some(List(start, end, country)) = csv.parseLine(row)
ipMapState.put(start, country)
}
fileSource.close()
csvFinishedFlag.update(true)
}
out.collect {
if (ipMapState.contains(event.userIp)) {
val details = ipMapState.get(event.userIp)
event.copy(data =
event.data.copy(
ipLocation = Some(details.country)
))
} else {
event
}
}
}
}
4.结果:由于阻止文件读取操作,它太黑并且阻止事件处理。
你能告诉我在这种情况下我能做些什么吗?
谢谢
1条答案
按热度按时间a9wyjsp71#
您可以实现一个自定义分区器,并将扩展数据的一部分加载到每个分区中。这里有一个这样的例子;我将摘录一些关键部分:
工作组织如下:
自定义分区器需要知道有多少个分区,并确定地将每个事件分配给特定的分区:
然后,扩展函数利用知道如何进行分区的优势,只将相关切片加载到每个示例中:
请注意,扩展没有在键控流上进行,因此不能在扩展函数中使用键控状态或计时器。