为什么我没有得到抛出的异常?

yc0p9oo0  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(303)

我有以下代码片段,尝试向kafka服务器发送消息:

def main(args: Array[String]): Unit = {

    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("retries", 0: java.lang.Integer)
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    val TOPIC = "test"
    val record = new ProducerRecord(TOPIC, "key", "the end " + new java.util.Date)
    try {
      producer.send(record).get()
      producer.close()
    } catch {
      case e: Exception => println(e.getMessage)
    }

    println("Hello, world!")

  }

如你所见,我 retries0 ,这应该意味着,如果kafka不可用,它不应该无限次地重试连接到服务器。
我没有启动kafka服务器,因为我想看看是否要重试。
我总是有以下输出:

22:17:57.891 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node -1 disconnected.
22:17:57.891 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
22:17:57.891 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
22:17:57.942 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
22:17:57.994 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
22:17:58.045 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
22:17:58.095 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
22:17:58.146 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
22:17:58.197 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
22:17:58.248 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request
22:17:58.248 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null)
22:17:58.249 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
    at java.lang.Thread.run(Thread.java:748)

例外情况从未被发现。
我做错什么了?
更新
我将代码更改为:

try {
  val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("retries", 0: java.lang.Integer)
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

  val TOPIC = "test"
  val record = new ProducerRecord(TOPIC, "key", "the end " + new java.util.Date)

  val producer = new KafkaProducer[String, String](props)
  producer.send(record).get()
  producer.close()
} catch {
  case e: Exception => println(e.getMessage)
}

但它仍然没有被抓住。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题