java—如何使用spring提供的kafka配置来代替冗长的配置

t8e9dugd  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(158)

这是我的Kafka配置:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

@Autowired
KafkaProperties kafkaprops;

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.consumer.group-id}")
private String groupId;

@Bean
public Map<String, Object> consumerConfigs() {

    Map<String, Object> props = kafkaprops.buildConsumerProperties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    return props;
}

@Bean
public ConsumerFactory<String, Student> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Student> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Student> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public Receiver receiver() {
    return new Receiver();
  }
}

我相信有更好的方法可以使用spring提供的kafkatemplate重写这段代码

@Autowired
KafkaTemplate<String, Student> kafkaTemplate;

@Value(value = "${proj.kafka.topic}")

@Autowired
public KafkaConsumerConfig(KafkaTemplate<String, Student> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}

是否有任何方法可以为我的生产者和消费者配置获得正确的spring配置文件。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题