我在一个消费者组中有两个消费者分配了相同的kafka主题分区。我想从消费者b内部得到消费者a的最后一次读取偏移量。有什么想法,怎么实现?
2ledvvac1#
一个分区永远不会分配给同一组中的两个使用者示例。您可以使用下面的脚本来了解最后消耗的偏移量
sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer --group groupname --describe
juud5qan2#
在kafka->bin中使用以下命令使用groupid更改组id: sh kafka-consumer-groups.sh --bootstrap-server localhost:29092 --group groupId --describe 您将得到如下输出:
sh kafka-consumer-groups.sh --bootstrap-server localhost:29092 --group groupId --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID topic1 0 0 35 35 - - - topic2 0 1600 1600 0 - - -
ddhy6vgd3#
kafka存储偏移量是(consumer group id,topic,partition),所以首先要注意的是,从kafka的Angular 来看,没有“consumer a的最后读取偏移量”这样的东西。使用kafka消费api可以获得的所有信息都是针对给定的(组、主题、分区)的。在consumerapi中有两个方法可能很有用。committed():获取给定分区的上一次提交的偏移量(无论提交是由这个进程还是另一个进程进行的)。position():获取要获取的下一条记录的偏移量(如果存在具有该偏移量的记录)。如果这不是你所需要的,那么你将不得不自己实现一些东西。假设您已经知道如何从使用者a读取最后一个偏移量,那么使用者a应该将该值存储在使用者b可用的某个位置。这个位置可能是Kafka本身。例如,消费者a可以将上次读取偏移量发布到一个众所周知的主题,如consumera-p0,消费者b可以订阅这个主题。Zookeeper。再次,在一条众所周知的道路上达成一致。外部数据库。如果两个用户共享同一个操作系统,则有更基本的选择:ipc、文件系统中的文件、内存中受锁保护的变量等。
3条答案
按热度按时间2ledvvac1#
一个分区永远不会分配给同一组中的两个使用者示例。
您可以使用下面的脚本来了解最后消耗的偏移量
juud5qan2#
在kafka->bin中使用以下命令使用groupid更改组id:
sh kafka-consumer-groups.sh --bootstrap-server localhost:29092 --group groupId --describe
您将得到如下输出:ddhy6vgd3#
kafka存储偏移量是(consumer group id,topic,partition),所以首先要注意的是,从kafka的Angular 来看,没有“consumer a的最后读取偏移量”这样的东西。使用kafka消费api可以获得的所有信息都是针对给定的(组、主题、分区)的。在consumerapi中有两个方法可能很有用。
committed():获取给定分区的上一次提交的偏移量(无论提交是由这个进程还是另一个进程进行的)。
position():获取要获取的下一条记录的偏移量(如果存在具有该偏移量的记录)。
如果这不是你所需要的,那么你将不得不自己实现一些东西。假设您已经知道如何从使用者a读取最后一个偏移量,那么使用者a应该将该值存储在使用者b可用的某个位置。这个位置可能是
Kafka本身。例如,消费者a可以将上次读取偏移量发布到一个众所周知的主题,如consumera-p0,消费者b可以订阅这个主题。
Zookeeper。再次,在一条众所周知的道路上达成一致。
外部数据库。
如果两个用户共享同一个操作系统,则有更基本的选择:ipc、文件系统中的文件、内存中受锁保护的变量等。