有没有办法异步修改flink keyedprocessfunction中的状态?

ruarlubt  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(303)

我有两个来源,Kafka和hbase。在Kafka,只有24小时的数据流。在hbase中,从一开始就有一个聚合数据。我的目的是当某个会话的流输入(kafka)发生时,两个数据在流处理上合并。我尝试了两种方法,但由于性能原因不满意。
经过一番搜索,我对键控过程函数中的状态有了一个想法。想法就在下面(使用键控进程函数的状态进行缓存
使用会话信息对键控进程函数进行输入
检查键控进程的状态
如果状态未初始化->则从hbase查询并初始化到状态->转到5
否则(状态已初始化)->转到5
使用状态执行业务逻辑
在编写这个想法的过程中,我遇到了一个性能问题,即使用sync方式查询hbase的速度很慢。所以,我尝试了异步版本,但很复杂。
我面临两个问题。其中一个是processelement和hbase async worker线程之间的线程安全问题,另一个是processelement函数结束后(不是hbase async worker结束)进程函数的上下文过期。

val sourceStream = env.addsource(kafkaConsumer.setStartFromGroupOffsets())

sourceStream.keyBy(new KeySelector[InputMessage, KeyInfo]() { 
  override def getKey(v: InputMessage): KeyInfo = v.toKeyInfo()
})
  .process(new KeyedProcessFunction[KeyInfo, InputMessage, OUTPUTTYPE]() {

    var state: MapState[String, (String, Long)] = _

    override def open(parameters: Configuration): Unit = {
      val conn = ConnectionFactory.createAsyncConnection(hbaseConfInstance).join
      table = conn.getTable(TableName.valueOf("tablename"))

      state = getRuntimeContext.getMapState(stateDescripter)
    }

    def request(action: Consumer[CacheResult] ): Unit = {
      if ( !state.isEmpty ) {
        action.accept(new CacheResult(state))
      }
      else {  // state is empty, so load from hbase
        table.get(new Get(key)).thenAccept((hbaseResult: Result) => {
          // this is called by worker thread
          hbaseResult.toState(state)  // convert from hbase result into state
          action.accept(new CacheResult(state))
        }
      }
    }

    override def processElement(value: InputMessage
                           , ctx: KeyedProcessFunction[KeyInfo, InputMessage, OUTPUTTYPE]#Context
                           , out: Collector[OUTPUTTYPE]): Unit = {
      val businessAction = new Consumer[CacheResult]() {
        override def accept(t: CacheResult): Unit = {
          // .. do business logic here.
          out.collect( /* final result */ )
        }
      }

      request(businessAction)

    }
}).addSink()

有没有建议让keyedprocessfunction在第三方异步调用中可用?
或者在《Flink》中使用混合的Kafka和hbase的其他想法?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题