嗨,我现在涉足 Spring Kafka,并成功地添加了一个单一的KafkaListenerContainerFactory到我的听众。现在我想添加多个kafkalistenerContainerFactory(一个用于将有json消息的主题,另一个用于字符串)。参见下面的代码:
@EnableKafka
@Configuration
public class KafkaConsumersConfig {
private final KafkaConfiguration kafkaConfiguration;
@Autowired
public KafkaConsumersConfig(KafkaConfiguration kafkaConfiguration) {
this.kafkaConfiguration = kafkaConfiguration;
}
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(jsonConsumerFactory());
factory.setConcurrency(3);
factory.setAutoStartup(true);
return factory;
}
@Bean
public ConsumerFactory<String,Record> jsonConsumerFactory(){
JsonDeserializer<Record> jsonDeserializer = new JsonDeserializer<>(Record.class);
return new DefaultKafkaConsumerFactory<>(jsonConsumerConfigs(),new StringDeserializer(), jsonDeserializer);
}
@Bean
public Map<String,Object> jsonConsumerConfigs(){
Map<String,Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBrokerAddress());
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getJsonGroupId());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
return propsMap;
}
@Bean
public KafkaListenerContainerFactory<?> kafkaFileListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(fileConsumerFactory());
factory.setConcurrency(3);
factory.setAutoStartup(true);
return factory;
}
@Bean
public ConsumerFactory<String,String> fileConsumerFactory(){
return new DefaultKafkaConsumerFactory<>(fileConsumerConfigs());
}
@Bean
public Map<String,Object> fileConsumerConfigs(){
Map<String,Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBrokerAddress());
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getFileGroupId());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
return propsMap;
}
}
运行此命令会出现以下错误:
Description:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
- Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans 'jsonConsumerFactory', 'fileConsumerFactory'
Action:
Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.
我做错什么了?
3条答案
按热度按时间1szpjjfi1#
我已经实现了下面的代码和它的工作对我来说很好。
另外,我还将spring.autoconfigure.exclude属性设置为
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
这是我的消费者配置消费者1
消费者2
另外,我的Kafka模板是
fquxozlt2#
看起来你不会依赖Spring Boot的Kafka自动配置。
Spring Boot在
KafkaAutoConfiguration
:既然你有
jsonConsumerFactory
以及fileConsumerFactory
,它们将覆盖自动配置提供的。但另一方面
KafkaAnnotationDrivenConfiguration
,您的非工厂可以应用于:因为你的
ConsumerFactory
豆子不好吃ConsumerFactory<Object, Object>
类型。所以:
只是排除
KafkaAutoConfiguration
通过将以下内容添加到应用程序属性文件,从spring boot自动配置:spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
或者给你的KafkaListenerContainerFactory
豆子到kafkaListenerContainerFactory
在后备箱里覆盖它或者做一个
ConsumerFactory
豆子作为ConsumerFactory<Object, Object>
类型。ezykj2lf3#
您可以在kafkalistener定义中定义每个容器工厂,如下所示: