kafka producer:如何处理“java.net.connectexception:连接被拒绝”

ia2d9nvy  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(622)

我使用Kafka0.10.1.0。
这是我的制作人:

val props: Properties = ...
val producer = new KafkaProducer[String, AnyRef](props)
val callback = new Callback {
   override def onCompletion(md: RecordMetadata, e: Exception): Unit = ...
}
producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback)

但是上面的回调无法处理 java.net.ConnectException: Connection refused 以防Kafka服务器关闭。
升级版
这个 ConnectionException 在另一个线程中上升(进入 Sender 用于 KafkaProducer ). 因此我们不能使用 try {} catch 为了它。另外,我不需要重试机制,我需要一种处理这种情况的方法(例如,如果kafka关闭,生产者无法发送消息,那么我将使用另一个队列api)。
有办法处理这个异常吗?

kq4fsx7k

kq4fsx7k1#

你有几个选择。scala提供了一种捕获异常的方法,其形式如下:

try { 
   // ... 
   } 
   catch {
     case ioe: IOException => ... // more specific cases first !
     case e: Exception => ...
   }

所以最简单的方法是:

try { 
     producer.send(new ProducerRecord[String, AnyRef]("topic", "hello"), callback)
   } 
   catch {
     case ce: ConnectionException => // handle exception
   }

更复杂但更强大的是重试机制:
scala实现这种可重试调用的方法是什么?
另请注意,kafka producer内置了重试机制,这可能也会有帮助:
设置一个大于零的值将导致客户机重新发送其发送失败的任何记录,并可能出现暂时性错误。请注意,此重试与客户端在收到错误时重新发送记录没有什么不同。允许重试而不将max.in.flight.requests.per.connection设置为1可能会更改记录的顺序,因为如果将两个批发送到单个分区,并且第一个失败并重试,但第二个成功,则第二个批中的记录可能会首先出现。

相关问题