下面是必需的场景。topic-1有6个分区,现在我想创建3个消费组cg1、cg2和cg3,并这样Map它(cg1-0,1;cg2-2,3;cg3-4,5)。如何使用kafka-console-consumer.sh或kafka-consumer-groups.sh创建它甚至Kafka的文档也解释了这个场景,但没有提到如何做到这一点。感谢您的帮助!!!
lskq00tm1#
kafka使用者组是共享相同组id的使用者的集合。使用者组通过在使用者之间共享分区来分发处理。下图显示了一个包含三个分区的主题和一个包含两个成员的使用者组。主题中的每个分区只分配给组中的一个成员。注意:具有n个分区的主题最多可由每个使用者1个分区的使用者组的n个使用者使用。在您的例子中,如果在主题上使用使用者组,则意味着所有分区都将分配给该使用者组。但是如果你对消费群不感兴趣,你可以直接给每个消费群分配一个分区,这样就不会出现重新平衡的情况我正在使用Kafka融合Kafka2.6.0-5.1.2:
sh kafka-console-consumer --bootstrap-server localhost:9092 --partition 0 --topic abc --group cg1 sh kafka-console-consumer --bootstrap-server localhost:9092 --partition 1 --topic abc --group cg1
--partitioninteger:partition:除非指定'--offset',否则从消耗开始消耗的分区从分区的末尾开始。使用消费者组,您可以描述消费者的详细信息
sh kafka-consumer-groups --bootstrap-server localhost:9020 --describe --group a TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID abc 0 123 678 0 - - - abc 1 234 345 0 - - -
您还可以通过java手动分配分区,如下所示
List<TopicPartition> partitions = new ArrayList<>(); partitions.add(new TopicPartition("abc", 0)); partitions.add(new TopicPartition("abc", 1)); ...... new KafkaConsumer<>(consumerProperties).assign(partitions);
注意,不能通过主题订阅(即使用subscribe)将手动分区分配(即使用assign)与动态分区分配混合使用。参考号:这里有以下几种替代方法:使用3个单独的主题使用单独的使用者组来使用消息。使用消息时以编程方式筛选分区。
nvbavucw2#
我现在不能尝试,但我认为你需要通过它作为一个消费者财产
kafka-console-consumer.sh --consumer-property group.id=${your_group_id}
或者如果你有一个配置文件
kafka-console-consumer.sh --consumer.config ${your_config_file}
2条答案
按热度按时间lskq00tm1#
kafka使用者组是共享相同组id的使用者的集合。使用者组通过在使用者之间共享分区来分发处理。
下图显示了一个包含三个分区的主题和一个包含两个成员的使用者组。主题中的每个分区只分配给组中的一个成员。
注意:具有n个分区的主题最多可由每个使用者1个分区的使用者组的n个使用者使用。
在您的例子中,如果在主题上使用使用者组,则意味着所有分区都将分配给该使用者组。
但是如果你对消费群不感兴趣,你可以直接给每个消费群分配一个分区,这样就不会出现重新平衡的情况
我正在使用Kafka融合Kafka2.6.0-5.1.2:
--partitioninteger:partition:除非指定'--offset',否则从消耗开始消耗的分区从分区的末尾开始。
使用消费者组,您可以描述消费者的详细信息
您还可以通过java手动分配分区,如下所示
注意,不能通过主题订阅(即使用subscribe)将手动分区分配(即使用assign)与动态分区分配混合使用。
参考号:这里
有以下几种替代方法:
使用3个单独的主题使用单独的使用者组来使用消息。
使用消息时以编程方式筛选分区。
nvbavucw2#
我现在不能尝试,但我认为你需要通过它作为一个消费者财产
或者如果你有一个配置文件