同步执行Flink DataStream[String],但异步对每条消息进行多次更新

zvokhttg  于 2022-12-09  发布在  Apache
val updatedDataStream = dataStream.map(new MyMapFunction)

我使用map()而不是Flink的原生AsyncDataStream,因为我想同步地处理消息。AsyncDataStream. unorderedWait或orderedWait将异步地处理消息。

class MyMapFunction extends RichMapFunction[String, String]{

  private var client: AsyncClient = _

  override def open(parameters: Configuration): Unit = {
    client = new AsyncClient

  override def map(value: String): String = {
    if (value.nonEmpty) {
      // below line de-serializes json message to an parsable object
      val a = objectMapper.readValue(value, classOf[Test])

      // below function calls (firstUpdate and secondUpdate) return back Future[String]
      val firstFieldValue = client.firstUpdate()
      val secondFieldValue = client.secondUpdate()

      def updateRecord(r1: String, r2: String): String = {
        a.firstField = r1
        a.secondField = r2
        // below line serializes object back to a json String

      val enrichment = for {
        r1 <- firstFieldValue
        r2 <- secondFieldValue
      } yield (updateRecord(r1, r2))

      val f = enrichment.onComplete {
        case Success(result) => result
        case Failure(exception) => exception

    } else ""





Since map has a synchronous signature, you'll have to block. Await.result blocks until the future completes.

// instead of val f = enrichment.onComplete ...
Await.result(enrichment, Duration.Inf)

Note that blocking like this may limit throughput, though if r1 and r2 are able to execute in parallel, this period of blocking will likely be shorter than the time the thread invoking map would be blocked if done synchronously.
