添加多个KafkalistenerContainerFactory时出现问题

ruyhziif  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(951)

嗨,我现在涉足 Spring Kafka,并成功地添加了一个单一的KafkaListenerContainerFactory到我的听众。现在我想添加多个kafkalistenerContainerFactory(一个用于将有json消息的主题,另一个用于字符串)。参见下面的代码:

@EnableKafka
@Configuration
public class KafkaConsumersConfig {

    private final KafkaConfiguration kafkaConfiguration;

    @Autowired
    public KafkaConsumersConfig(KafkaConfiguration kafkaConfiguration) {
        this.kafkaConfiguration = kafkaConfiguration;
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(jsonConsumerFactory());
        factory.setConcurrency(3);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String,Record> jsonConsumerFactory(){
        JsonDeserializer<Record> jsonDeserializer = new JsonDeserializer<>(Record.class);
        return new DefaultKafkaConsumerFactory<>(jsonConsumerConfigs(),new StringDeserializer(), jsonDeserializer);
    }

    @Bean
    public Map<String,Object> jsonConsumerConfigs(){
        Map<String,Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaConfiguration.getBrokerAddress());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getJsonGroupId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
        return propsMap;
    }
    @Bean
    public KafkaListenerContainerFactory<?> kafkaFileListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(fileConsumerFactory());
        factory.setConcurrency(3);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String,String> fileConsumerFactory(){
        return new DefaultKafkaConsumerFactory<>(fileConsumerConfigs());
    }

    @Bean
    public Map<String,Object> fileConsumerConfigs(){
        Map<String,Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaConfiguration.getBrokerAddress());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getFileGroupId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
        return propsMap;
    }
}

运行此命令会出现以下错误:

Description:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans 'jsonConsumerFactory', 'fileConsumerFactory'

Action:

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.

我做错什么了?

1szpjjfi

1szpjjfi1#

我已经实现了下面的代码和它的工作对我来说很好。

// LISTENER 1
@Bean
@ConditionalOnMissingBean(name = "yourListenerFactory1")
public ConsumerFactory<String, YourCustomObject1> yourConsumerFactory1() {
   Map<String, Object> props = new HashMap<>();
   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
   props.put(ConsumerConfig.GROUP_ID_CONFIG, "YOUR-GROUP-1");
   return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
     new JsonDeserializer<>(YourCustomObject1.class));
}

@Bean(name = "yourListenerFactory1")
public ConcurrentKafkaListenerContainerFactory<String, YourCustomObject1> 
  yourListenerFactory1() {
   ConcurrentKafkaListenerContainerFactory<String, YourCustomObject1> factory =
       new ConcurrentKafkaListenerContainerFactory<>();
   factory.setConsumerFactory(yourConsumerFactory1());
   ContainerProperties containerProperties = factory.getContainerProperties();
   containerProperties.setPollTimeout(...);
   containerProperties.setAckMode(AckMode...);
   return factory;
}

// LISTENER 2
@Bean
@ConditionalOnMissingBean(name = "yourListenerFactory2")
public ConsumerFactory<String, YourCustomObject2> yourConsumerFactory2() {
   Map<String, Object> props = new HashMap<>();
   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
   props.put(ConsumerConfig.GROUP_ID_CONFIG, "YOUR-GROUP-2");
   return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
      new JsonDeserializer<>(YourCustomObject2.class));
}

@Bean(name = "yourListenerFactory2")
public ConcurrentKafkaListenerContainerFactory<String, YourCustomObject2> 
   yourListenerFactory2() {
    ConcurrentKafkaListenerContainerFactory<String, YourCustomObject2> factory 
         =  new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(yourConsumerFactory2());
    ContainerProperties containerProperties = factory.getContainerProperties();
    containerProperties.setPollTimeout(...);
    containerProperties.setAckMode(AckMode...);
    return factory;
 }

另外,我还将spring.autoconfigure.exclude属性设置为 spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration 这是我的消费者配置
消费者1

@KafkaListener(id = "your-cousumer-1",
  topicPattern = "your-topic-1",
  containerFactory = "yourListenerFactory1")
 public void consumer1(YourCustomObject1 data,
                       Acknowledgment acknowledgment,
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
      @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
      @Header(KafkaHeaders.OFFSET) List<Long> offsets) throws Exception { ... }

消费者2

@KafkaListener(id = "your-cousumer-2",
                 topicPattern = "your-topic-2",
                 containerFactory = "yourListenerFactory2")
  public void consumer2(YourCustomObject2 data,
                        Acknowledgment acknowledgment,
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
      @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
      @Header(KafkaHeaders.OFFSET) List<Long> offsets) throws Exception { ...  }

另外,我的Kafka模板是

@Autowired
KafkaTemplate<String, Object> kafkaTemplate;
fquxozlt

fquxozlt2#

看起来你不会依赖Spring Boot的Kafka自动配置。
Spring Boot在 KafkaAutoConfiguration :

@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {

既然你有 jsonConsumerFactory 以及 fileConsumerFactory ,它们将覆盖自动配置提供的。
但另一方面 KafkaAnnotationDrivenConfiguration ,您的非工厂可以应用于:

@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {

因为你的 ConsumerFactory 豆子不好吃 ConsumerFactory<Object, Object> 类型。
所以:
只是排除 KafkaAutoConfiguration 通过将以下内容添加到应用程序属性文件,从spring boot自动配置: spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration 或者给你的 KafkaListenerContainerFactory 豆子到 kafkaListenerContainerFactory 在后备箱里覆盖它
或者做一个 ConsumerFactory 豆子作为 ConsumerFactory<Object, Object> 类型。

ezykj2lf

ezykj2lf3#

您可以在kafkalistener定义中定义每个容器工厂,如下所示:

@KafkaListener(topics = "fileTopic", containerFactory = "kafkaFileListenerContainerFactory")
public void fileConsumer(...) {...}

@KafkaListener(topics = "jsonTopic", containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonConsumer(...) {...}

相关问题