val updatedDataStream = dataStream.map(new MyMapFunction)
我使用map()而不是Flink的原生AsyncDataStream,因为我想同步地处理消息。AsyncDataStream. unorderedWait或orderedWait将异步地处理消息。
在下面的代码中,使用2次更新来更新dataStream中的每个消息,但是这2次更新是异步完成的,因此两次更新的总时间等于最慢更新所花费的时间。
class MyMapFunction extends RichMapFunction[String, String]{
private var client: AsyncClient = _
override def open(parameters: Configuration): Unit = {
client = new AsyncClient
}
override def map(value: String): String = {
if (value.nonEmpty) {
// below line de-serializes json message to an parsable object
val a = objectMapper.readValue(value, classOf[Test])
// below function calls (firstUpdate and secondUpdate) return back Future[String]
val firstFieldValue = client.firstUpdate()
val secondFieldValue = client.secondUpdate()
def updateRecord(r1: String, r2: String): String = {
a.firstField = r1
a.secondField = r2
// below line serializes object back to a json String
objectMapper.writeValueAsString(a)
}
val enrichment = for {
r1 <- firstFieldValue
r2 <- secondFieldValue
} yield (updateRecord(r1, r2))
val f = enrichment.onComplete {
case Success(result) => result
case Failure(exception) => exception
}
} else ""
}
}
问题:onComplete
返回Unit
时,这将不起作用。但我希望它返回result
(字符串),这样我就可以将其发送回updatedDataStream
。
1条答案
按热度按时间omhiaaxx1#
Since
map
has a synchronous signature, you'll have to block.Await.result
blocks until the future completes.Note that blocking like this may limit throughput, though if
r1
andr2
are able to execute in parallel, this period of blocking will likely be shorter than the time the thread invokingmap
would be blocked if done synchronously.