consumer.endoffsets在kafka中是如何工作的?

2ul0zpep  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(795)

假设我有一个无限期运行的计时器任务,它迭代kafka集群中的所有使用者组,并为每个组的所有分区输出lag、committed offset和end offset。类似于kafka控制台用户组脚本的工作方式,只是它适用于所有组。
像这样的
单个使用者-不工作-不返回某些提供的主题分区的偏移量(例如,提供10个-返回5个偏移量)

Consumer consumer;

static {
  consumer = createConsumer();
}

run() { 
  List<String> groupIds = getConsumerGroups();
  for(String groupId: groupIds) {
       List<TopicParition> topicParitions =  getTopicParitions(groupId);
       consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
   }
}

多个消费者-工作

run() { 
   List<String> groupIds = getConsumerGroups();
   for(String groupId: groupIds) {
        List<TopicParition> topicParitions =  getTopicParitions(groupId);
        Consumer consumer = createConsumer();
        consumer.endOffsets(topicParitions); This works!!!
   }
 }

versions:kafka-client 2.0.0
我是否错误地使用了消费者api?理想情况下,我想使用单一消费者。
如果你需要更多的细节,请告诉我。

3z6pesqy

3z6pesqy1#

我想你快到了。首先收集所有感兴趣的主题分区,然后发布一个 consumer.endOffsets 命令。
请记住,我还没有尝试运行它,但类似的操作应该会起作用:

run() { 
   Consumer consumer = createConsumer();
   List<String> groupIds = getConsumerGroups();
   List<TopicPartition> topicPartitions = new ArrayList<>();

   for (String groupId: groupIds) {
        topicPartitions.addAll(getTopicPartitions(groupId));
   }

   consumer.endOffsets(topicPartitions); 
}
qv7cva1a

qv7cva1a2#

这是一个窃听器 Fetcher.fetchOffsetsByTimes() 特别是在里面 groupListOffsetRequests 方法,在该方法中,当请求分区偏移量的前导未知或不可用时,逻辑未添加分区以重试。
当您在所有使用者组分区中使用单个使用者时,这一点更为明显,其中一些组在我们请求时已经具有主题分区领导信息 endoffsets 而对于没有领导者信息未知或不可用的主题分区,则会因为错误而被取消。
后来,我意识到从每个消费者组中提取主题分区并不是一个好主意,而是进行了更改,从中读取主题分区 AdminClient.listTopics & AdminClient.describeTopics 一下子传给 Consumer.endOffsets .
尽管这并不能完全解决这个问题,因为主题/分区在多次运行之间可能仍然不可用或未知。
更多信息可以找到- KAFKA-7044 & pull request . 这已经在2.1.0版本中修复并计划好了。

相关问题