org.apache.kafka.common.errors.networkexception

yxyvkwin  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(988)

我有一个kafka客户端代码,它连接到kafka(服务器0.10.1,客户端是0.10.2)代理。代码中有2个主题,包含2个不同的消费群体,还有一个生产者。每隔一段时间从生产者代码获取networkexception(2天一次,5天一次,…)。我们在两个消费者组的日志中都看到消费者组(re)加入信息,后跟producer future.get()调用的networkexception。不知道为什么会出现这个错误。
代码:-

final Future<RecordMetadata> futureResponse = 
producer.send(new ProducerRecord<>("ping_topic", "ping"));  
futureResponse.get();

例外情况:-

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:70)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:57)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)

networkexception的kafka api定义,
“杂项。发出请求时发生与网络相关的ioexception。这可能是因为客户机的元数据已过时,它正在向一个现在已死亡的节点发出请求。”
谢谢

5kgi1eie

5kgi1eie1#

我在测试Kafka消费者时也遇到了同样的错误。我用了一个发件人模板。在使用者配置中,我另外设置了以下属性:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);

发送消息后,我添加了一个线程睡眠:

ListenableFuture<SendResult<String, String>> future =
    senderTemplate.send(MyConsumer.TOPIC_NAME, jsonPayload);

  Thread.Sleep(10000).

这是必要的,使测试工作,但可能不适合你的情况。

相关问题