假设我有一个无限期运行的计时器任务,它迭代kafka集群中的所有使用者组,并为每个组的所有分区输出lag、committed offset和end offset。类似于kafka控制台用户组脚本的工作方式,只是它适用于所有组。
像这样的
单个使用者-不工作-不返回某些提供的主题分区的偏移量(例如,提供10个-返回5个偏移量)
Consumer consumer;
static {
consumer = createConsumer();
}
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
}
}
多个消费者-工作
run() {
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds) {
List<TopicParition> topicParitions = getTopicParitions(groupId);
Consumer consumer = createConsumer();
consumer.endOffsets(topicParitions); This works!!!
}
}
versions:kafka-client 2.0.0
我是否错误地使用了消费者api?理想情况下,我想使用单一消费者。
如果你需要更多的细节,请告诉我。
2条答案
按热度按时间3z6pesqy1#
我想你快到了。首先收集所有感兴趣的主题分区,然后发布一个
consumer.endOffsets
命令。请记住,我还没有尝试运行它,但类似的操作应该会起作用:
qv7cva1a2#
这是一个窃听器
Fetcher.fetchOffsetsByTimes()
特别是在里面groupListOffsetRequests
方法,在该方法中,当请求分区偏移量的前导未知或不可用时,逻辑未添加分区以重试。当您在所有使用者组分区中使用单个使用者时,这一点更为明显,其中一些组在我们请求时已经具有主题分区领导信息
endoffsets
而对于没有领导者信息未知或不可用的主题分区,则会因为错误而被取消。后来,我意识到从每个消费者组中提取主题分区并不是一个好主意,而是进行了更改,从中读取主题分区
AdminClient.listTopics & AdminClient.describeTopics
一下子传给Consumer.endOffsets
.尽管这并不能完全解决这个问题,因为主题/分区在多次运行之间可能仍然不可用或未知。
更多信息可以找到-
KAFKA-7044
&pull request
. 这已经在2.1.0版本中修复并计划好了。