我正在使用scala向kafka发布消息。我有一个消息列表要发布到Kafka,可能是异步的。但是,最后,如果在发布至少一条消息时出现任何错误,我必须捕获该错误并采取措施。我知道回调方法,但这将工作一个消息。我想积累或找出所有这些状态的方式,并决定是否没有失败。类似于批处理状态。
for (message <- messageList) {
// Create a message
// producer is created
// I have a separate util package, having the MyCallback class with overridden onCompletion method.
producer.send(new ProducerRecord[String, MyMessage](config("topic"), message ), new MyCallback(config))
}
// here, I need help, to see if any of the messages sent is failed or not.
// How to capture the statuses of callback here for entire batch?
// Separate class file
class MyCallback (config: Map[String, String]) extends Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
// if successful meta data, do something
// if exception, I can log the error.
}
}
基本上,我可以在回调类方法中检查单个消息的状态。
如何在for循环完成后跟踪这些状态,并在此基础上采取进一步的操作?
1条答案
按热度按时间vkc1a9a21#
如果你搬家
new MyCallback(config)
在for循环之前,这可能会有所帮助。然后有一个示例,可以将异常收集到其中。
另外,回调对象将在循环之后的作用域中,您可以检索状态列表
我的斯卡拉有点生 rust 了,但我想这说明了我的想法
然后
或者您可以尝试通过在循环内进行检查来提前失败,但这将是异步的,并且可能无法很好地工作