kafkaconsumer从不退出poll方法-groupcoordinatornotavailableexception

6uxekuva  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(448)

我有一个 KafkaConsumer 在java中,并且当前它从未退出 .poll 方法。当我在调试模式下深入研究源代码时,我发现它被困在了while循环中 AbstractCoordinator.ensureCoordinatorKnown() ,因为找不到协调器。
未来从 sendGroupMetadataRequest() 在循环中第一次失败 org.apache.kafka.clients.consumer.internals.SendFailedException ,然后每次都会失败 org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available. . 有人知道为什么会这样吗?
如果我使用控制台生产者/消费者,我就能够成功地发送和接收消息,只有当我使用kafkaconsumer的实现时。另外,使用者在我的两台服务器上工作,所以我知道这不是使用者的实现。
以下是我的消费者创建时使用的属性:

Properties props = new Properties();
props.put("bootstrap.servers", "myserver:9000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");

编辑:
主题肯定是在消费者开始之前创建的。
编辑2:我删除了集群中的所有代理并重新创建了它们,现在我在另一个地方失败了。在 AbstractCoordinator.ensureActiveGroup() 在试图重新加入时,未来从 performGroupJoin() 多次失败 org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is not the correct coordinator for this group. . 还是不知道发生了什么。
编辑3:我删除了代理并用不同的id重新创建了它们,现在 .poll() 方法正在返回,并且它正在成功地使用消息。不过,我还是想知道为什么一开始就失败了,这样我就可以确保它不会再发生了。

aiazj4mn

aiazj4mn1#

删除代理并创建新的代理修复了问题。但仍不确定经纪人是否出了问题。

相关问题