同步执行Flink DataStream[String],但异步对每条消息进行多次更新

zvokhttg  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(154)
val updatedDataStream = dataStream.map(new MyMapFunction)

我使用map()而不是Flink的原生AsyncDataStream,因为我想同步地处理消息。AsyncDataStream. unorderedWait或orderedWait将异步地处理消息。
在下面的代码中,使用2次更新来更新dataStream中的每个消息,但是这2次更新是异步完成的,因此两次更新的总时间等于最慢更新所花费的时间。

class MyMapFunction extends RichMapFunction[String, String]{

  private var client: AsyncClient = _

  override def open(parameters: Configuration): Unit = {
    client = new AsyncClient
  }

  override def map(value: String): String = {
    if (value.nonEmpty) {
      // below line de-serializes json message to an parsable object
      val a = objectMapper.readValue(value, classOf[Test])

      // below function calls (firstUpdate and secondUpdate) return back Future[String]
      val firstFieldValue = client.firstUpdate()
      val secondFieldValue = client.secondUpdate()

      def updateRecord(r1: String, r2: String): String = {
        a.firstField = r1
        a.secondField = r2
        // below line serializes object back to a json String
        objectMapper.writeValueAsString(a)
      }

      val enrichment = for {
        r1 <- firstFieldValue
        r2 <- secondFieldValue
      } yield (updateRecord(r1, r2))

      val f = enrichment.onComplete {
        case Success(result) => result
        case Failure(exception) => exception
      }

    } else ""
  }

}

问题:onComplete返回Unit时,这将不起作用。但我希望它返回result(字符串),这样我就可以将其发送回updatedDataStream

omhiaaxx

omhiaaxx1#

Since map has a synchronous signature, you'll have to block. Await.result blocks until the future completes.

// instead of val f = enrichment.onComplete ...
Await.result(enrichment, Duration.Inf)

Note that blocking like this may limit throughput, though if r1 and r2 are able to execute in parallel, this period of blocking will likely be shorter than the time the thread invoking map would be blocked if done synchronously.

相关问题