我知道我可以从哪个分区记录中找到,但我想知道有什么方法可以动态地获得在特定时刻为使用者分配了哪些分区?也许我需要实现一些侦听器来检测和跟踪分区分配信息?我正在使用 spring-kafka 1.3.2 与 ConcurrentKafkaListenerContainerFactory 以及 @KafkaListener .
spring-kafka 1.3.2
ConcurrentKafkaListenerContainerFactory
@KafkaListener
1tu0hz3e1#
我用不同的方法做了 KafkaListenerEndpointRegistry ```for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {List containers = ((ConcurrentMessageListenerContainer) messageListenerContainer).getContainers();
KafkaListenerEndpointRegistry
List<TopicPartition> topicPartitions = (List<TopicPartition>) containers.stream().flatMap(kafkaMessageListenerContainer -> kafkaMessageListenerContainer.getAssignedPartitions().stream()).collect(Collectors.toList()); partitions.addAll(topicPartitions.stream().map(TopicPartition::partition).collect(Collectors.toList()));
}
rmbxnbpk2#
是的,你可以做:
@Bean public ConsumerAwareRebalanceListener rebalanceListener() { return new ConsumerAwareRebalanceListener() { @Override public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) { // here partitions } }; }
然后将其添加到concurrentkafkalistenercontainerfactory
@Bean public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); ContainerProperties props = factory.getContainerProperties(); props.setConsumerRebalanceListener(rebalanceListener()); return factory; }
2条答案
按热度按时间1tu0hz3e1#
我用不同的方法做了
KafkaListenerEndpointRegistry
```for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
List containers = ((ConcurrentMessageListenerContainer) messageListenerContainer).getContainers();
}
rmbxnbpk2#
是的,你可以做:
然后将其添加到concurrentkafkalistenercontainerfactory