我有一个 KafkaConsumer
在java中,并且当前它从未退出 .poll
方法。当我在调试模式下深入研究源代码时,我发现它被困在了while循环中 AbstractCoordinator.ensureCoordinatorKnown()
,因为找不到协调器。
未来从 sendGroupMetadataRequest()
在循环中第一次失败 org.apache.kafka.clients.consumer.internals.SendFailedException
,然后每次都会失败 org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.
. 有人知道为什么会这样吗?
如果我使用控制台生产者/消费者,我就能够成功地发送和接收消息,只有当我使用kafkaconsumer的实现时。另外,使用者在我的两台服务器上工作,所以我知道这不是使用者的实现。
以下是我的消费者创建时使用的属性:
Properties props = new Properties();
props.put("bootstrap.servers", "myserver:9000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
编辑:
主题肯定是在消费者开始之前创建的。
编辑2:我删除了集群中的所有代理并重新创建了它们,现在我在另一个地方失败了。在 AbstractCoordinator.ensureActiveGroup()
在试图重新加入时,未来从 performGroupJoin()
多次失败 org.apache.kafka.common.errors.NotCoordinatorForGroupException: This is not the correct coordinator for this group.
. 还是不知道发生了什么。
编辑3:我删除了代理并用不同的id重新创建了它们,现在 .poll()
方法正在返回,并且它正在成功地使用消息。不过,我还是想知道为什么一开始就失败了,这样我就可以确保它不会再发生了。
1条答案
按热度按时间aiazj4mn1#
删除代理并创建新的代理修复了问题。但仍不确定经纪人是否出了问题。