kafkaproducer连接被拒绝

rfbsl7qr  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(611)

我试着给Kafka发送一些数据,但当我运行代码时

13:20:17.688 [kafka-producer-network-thread | producer-1] 
DEBUG org.apache.kafka.clients.NetworkClient - Node -1 disconnected.
13:20:17.784 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request
13:20:17.784 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at kafkaAdress:2181.
13:20:18.781 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - Connection with kafkaAdress/addressId disconnected
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:72)
at org.apache.kafka.common.network.Selector.poll(Selector.java:274)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
at java.lang.Thread.run(Unknown Source)

代码:

String topic = "TST";
    Properties props = new Properties();
    props.put("bootstrap.servers", "kafkaAdress:2181");
    props.put("key.serializer",      "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    for(int i = 0; i < 100; i++)
        producer.send(new ProducerRecord<String, String>(topic, "TestMessage"));    
    producer.close();

有人知道怎么解决这个问题吗?
我使用Kafka0.9.1

sshcrbum

sshcrbum1#

您的配置有些错误,bootstrap.server是kafka代理的地址,而不是zookeeper。生产者总是使用代理的地址来发布消息。

lmyy7pcs

lmyy7pcs2#

自从kafka 0.9以来,producerapi不再使用zookeeper。
属性bootstrap.servers应该包含用于建立到kafka集群的初始连接的代理列表。
2181是zookeeper端口。代理的默认端口是9092。

相关问题