我有以下代码片段,尝试向kafka服务器发送消息:
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("retries", 0: java.lang.Integer)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val TOPIC = "test"
val record = new ProducerRecord(TOPIC, "key", "the end " + new java.util.Date)
try {
producer.send(record).get()
producer.close()
} catch {
case e: Exception => println(e.getMessage)
}
println("Hello, world!")
}
如你所见,我 retries
至 0
,这应该意味着,如果kafka不可用,它不应该无限次地重试连接到服务器。
我没有启动kafka服务器,因为我想看看是否要重试。
我总是有以下输出:
22:17:57.891 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Node -1 disconnected.
22:17:57.891 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
22:17:57.891 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
22:17:57.942 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
22:17:57.994 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
22:17:58.045 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
22:17:58.095 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
22:17:58.146 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
22:17:58.197 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Give up sending metadata request since no node is available
22:17:58.248 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request
22:17:58.248 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null)
22:17:58.249 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:106)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:444)
at org.apache.kafka.common.network.Selector.poll(Selector.java:398)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
例外情况从未被发现。
我做错什么了?
更新
我将代码更改为:
try {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("retries", 0: java.lang.Integer)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val TOPIC = "test"
val record = new ProducerRecord(TOPIC, "key", "the end " + new java.util.Date)
val producer = new KafkaProducer[String, String](props)
producer.send(record).get()
producer.close()
} catch {
case e: Exception => println(e.getMessage)
}
但它仍然没有被抓住。
暂无答案!
目前还没有任何答案,快来回答吧!