我使用Kafka0.10.1.0。
这是我的制作人:
val props: Properties = ...
val producer = new KafkaProducer[String, AnyRef](props)
val callback = new Callback {
override def onCompletion(md: RecordMetadata, e: Exception): Unit = ...
}
producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback)
但是上面的回调无法处理 java.net.ConnectException: Connection refused
以防Kafka服务器关闭。
升级版
这个 ConnectionException
在另一个线程中上升(进入 Sender
用于 KafkaProducer
). 因此我们不能使用 try {} catch
为了它。另外,我不需要重试机制,我需要一种处理这种情况的方法(例如,如果kafka关闭,生产者无法发送消息,那么我将使用另一个队列api)。
有办法处理这个异常吗?
1条答案
按热度按时间kq4fsx7k1#
你有几个选择。scala提供了一种捕获异常的方法,其形式如下:
所以最简单的方法是:
更复杂但更强大的是重试机制:
scala实现这种可重试调用的方法是什么?
另请注意,kafka producer内置了重试机制,这可能也会有帮助:
设置一个大于零的值将导致客户机重新发送其发送失败的任何记录,并可能出现暂时性错误。请注意,此重试与客户端在收到错误时重新发送记录没有什么不同。允许重试而不将max.in.flight.requests.per.connection设置为1可能会更改记录的顺序,因为如果将两个批发送到单个分区,并且第一个失败并重试,但第二个成功,则第二个批中的记录可能会首先出现。