我有两个来源,Kafka和hbase。在Kafka,只有24小时的数据流。在hbase中,从一开始就有一个聚合数据。我的目的是当某个会话的流输入(kafka)发生时,两个数据在流处理上合并。我尝试了两种方法,但由于性能原因不满意。
经过一番搜索,我对键控过程函数中的状态有了一个想法。想法就在下面(使用键控进程函数的状态进行缓存
使用会话信息对键控进程函数进行输入
检查键控进程的状态
如果状态未初始化->则从hbase查询并初始化到状态->转到5
否则(状态已初始化)->转到5
使用状态执行业务逻辑
在编写这个想法的过程中,我遇到了一个性能问题,即使用sync方式查询hbase的速度很慢。所以,我尝试了异步版本,但很复杂。
我面临两个问题。其中一个是processelement和hbase async worker线程之间的线程安全问题,另一个是processelement函数结束后(不是hbase async worker结束)进程函数的上下文过期。
val sourceStream = env.addsource(kafkaConsumer.setStartFromGroupOffsets())
sourceStream.keyBy(new KeySelector[InputMessage, KeyInfo]() {
override def getKey(v: InputMessage): KeyInfo = v.toKeyInfo()
})
.process(new KeyedProcessFunction[KeyInfo, InputMessage, OUTPUTTYPE]() {
var state: MapState[String, (String, Long)] = _
override def open(parameters: Configuration): Unit = {
val conn = ConnectionFactory.createAsyncConnection(hbaseConfInstance).join
table = conn.getTable(TableName.valueOf("tablename"))
state = getRuntimeContext.getMapState(stateDescripter)
}
def request(action: Consumer[CacheResult] ): Unit = {
if ( !state.isEmpty ) {
action.accept(new CacheResult(state))
}
else { // state is empty, so load from hbase
table.get(new Get(key)).thenAccept((hbaseResult: Result) => {
// this is called by worker thread
hbaseResult.toState(state) // convert from hbase result into state
action.accept(new CacheResult(state))
}
}
}
override def processElement(value: InputMessage
, ctx: KeyedProcessFunction[KeyInfo, InputMessage, OUTPUTTYPE]#Context
, out: Collector[OUTPUTTYPE]): Unit = {
val businessAction = new Consumer[CacheResult]() {
override def accept(t: CacheResult): Unit = {
// .. do business logic here.
out.collect( /* final result */ )
}
}
request(businessAction)
}
}).addSink()
有没有建议让keyedprocessfunction在第三方异步调用中可用?
或者在《Flink》中使用混合的Kafka和hbase的其他想法?
暂无答案!
目前还没有任何答案,快来回答吧!