这是我的Kafka配置:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Autowired
KafkaProperties kafkaprops;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = kafkaprops.buildConsumerProperties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
@Bean
public ConsumerFactory<String, Student> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Student> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Student> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Receiver receiver() {
return new Receiver();
}
}
我相信有更好的方法可以使用spring提供的kafkatemplate重写这段代码
@Autowired
KafkaTemplate<String, Student> kafkaTemplate;
@Value(value = "${proj.kafka.topic}")
@Autowired
public KafkaConsumerConfig(KafkaTemplate<String, Student> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
是否有任何方法可以为我的生产者和消费者配置获得正确的spring配置文件。
暂无答案!
目前还没有任何答案,快来回答吧!