java—SpringBoot中创建kafkatemplate的正确方法

xu3bshqb  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(946)

我尝试在spring启动应用程序中配置apachekafka。我阅读了此文档并遵循以下步骤:
1) 我把这行字加到 aplication.yaml :

spring:
  kafka:
    bootstrap-servers: kafka_host:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer

2) 我创建新主题:

@Bean
    public NewTopic responseTopic() {
        return new NewTopic("new-topic", 5, (short) 1);
    }

现在我想用 KafkaTemplate :

private final KafkaTemplate<String, byte[]> kafkaTemplate;

public KafkaEventBus(KafkaTemplate<String, byte[]> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}

但intellij ide强调:

要解决这个问题,我需要创建bean:

@Bean
public KafkaTemplate<String, byte[]> myMessageKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

并将其传递给施工方当事人 greetingProducerFactory() :

@Bean
public ProducerFactory<String, byte[]> greetingProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka_hist4:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

但是如果我需要创建producerfactory手册,那么在application.yaml中设置有什么意义呢?

nsc4cvqm

nsc4cvqm1#

我认为你完全可以无视idea的警告;我在boot的模板中使用不同的泛型类型没有问题。。。

@SpringBootApplication
public class So55280173Application {

    public static void main(String[] args) {
        SpringApplication.run(So55280173Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, Foo foo) {
        return args -> {
            template.send("so55280173", "foo");
            if (foo.template == template) {
                System.out.println("they are the same");
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so55280173", 1, (short) 1);
    }

}

@Component
class Foo {

    final KafkaTemplate<String, String> template;

    @Autowired
    Foo(KafkaTemplate<String, String> template) {
        this.template = template;
    }

}

they are the same
2ledvvac

2ledvvac2#

我最初也有同样的问题,但当我执行它时,没有出现错误,工作正常。
忽略intellij idea的警告,这可能是idea在找出自动连线组件时遇到的问题。

v8wbuo2f

v8wbuo2f3#

默认情况下 KafkaTemplate<Object, Object> 是由中的spring boot创建的 KafkaAutoConfiguration 班级。因为spring在依赖注入期间考虑泛型类型信息,所以不能将默认bean自动连接到 KafkaTemplate<String, byte[]> .

相关问题