scala:如何捕获通过kafka异步发送的所有多条消息的状态?

nr9pn0ug  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(375)

我正在使用scala向kafka发布消息。我有一个消息列表要发布到Kafka,可能是异步的。但是,最后,如果在发布至少一条消息时出现任何错误,我必须捕获该错误并采取措施。我知道回调方法,但这将工作一个消息。我想积累或找出所有这些状态的方式,并决定是否没有失败。类似于批处理状态。

for (message <- messageList) {
    // Create a message
    // producer is created
    // I have a separate util package, having the MyCallback class with overridden onCompletion method.
    producer.send(new ProducerRecord[String, MyMessage](config("topic"), message ), new MyCallback(config))

}
// here, I need help, to see if any of the messages sent is failed or not.
// How to capture the statuses of callback here for entire batch?

// Separate class file
class MyCallback (config: Map[String, String]) extends Callback {
    override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
     // if successful meta data, do something
     // if exception, I can log the error.

    }
}

基本上,我可以在回调类方法中检查单个消息的状态。
如何在for循环完成后跟踪这些状态,并在此基础上采取进一步的操作?

vkc1a9a2

vkc1a9a21#

如果你搬家 new MyCallback(config) 在for循环之前,这可能会有所帮助。
然后有一个示例,可以将异常收集到其中。
另外,回调对象将在循环之后的作用域中,您可以检索状态列表
我的斯卡拉有点生 rust 了,但我想这说明了我的想法

class MyCallback (config: Map[String, String]) extends Callback {

    private val exceptions = _ // some mutable List

    def getExceptions() { return exceptions }

    override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
     if (exception != null) {
         exceptions.add(exception)
     }
    }
}

然后

val cb = new MyCallback(config)
for (message <- messageList) {
    producer.send(new ProducerRecord[String, MyMessage](config("topic"), message ), cb)
}

// TODO: check cb.getExceptions().size > 0

或者您可以尝试通过在循环内进行检查来提前失败,但这将是异步的,并且可能无法很好地工作

相关问题