我有一个kafka客户端代码,它连接到kafka(服务器0.10.1,客户端是0.10.2)代理。代码中有2个主题,包含2个不同的消费群体,还有一个生产者。每隔一段时间从生产者代码获取networkexception(2天一次,5天一次,…)。我们在两个消费者组的日志中都看到消费者组(re)加入信息,后跟producer future.get()调用的networkexception。不知道为什么会出现这个错误。
代码:-
final Future<RecordMetadata> futureResponse =
producer.send(new ProducerRecord<>("ping_topic", "ping"));
futureResponse.get();
例外情况:-
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:70)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:57)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
networkexception的kafka api定义,
“杂项。发出请求时发生与网络相关的ioexception。这可能是因为客户机的元数据已过时,它正在向一个现在已死亡的节点发出请求。”
谢谢
1条答案
按热度按时间5kgi1eie1#
我在测试Kafka消费者时也遇到了同样的错误。我用了一个发件人模板。在使用者配置中,我另外设置了以下属性:
发送消息后,我添加了一个线程睡眠:
这是必要的,使测试工作,但可能不适合你的情况。