SpringBootKafkalistener和多组ID

aij0ehis  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(687)
@KafkaListener(topics = "test, groupId = "G1")
  public void receiveMessage(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
      @Payload final String message) {
    doStuff();
  }

我有6个分区,想在同一组中创建6个消费者,比如说在一台机器中。我怎样才能做到呢?1.通过部署6个应用示例2.通过在同一个应用程序(重复代码)中创建6个消费者,是否有其他选择/建议?

bejyjqdl

bejyjqdl1#

您可以使用自定义的kafkalistener,因为使用默认kafkalistener必须在@kafkalistener注解或application.yml/properties文件中指定groupid。像下面的代码一样,使用uuid生成多个groupid。

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}
a6b3iqyw

a6b3iqyw2#

如果你说的是同样的话 consumer group ,只需配置 ConcurrentKafkaListenerContainerFactory 为了一个合适的 concurrency 配置属性:

spring.kafka.listener.concurrency = 6

请参阅《SpringKafka参考手册》了解更多信息 concurrency 选择意味着。

相关问题