SpringKafka中是否有多个生产者的代码示例?

cedebl8k  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(423)

我有一个应用程序,可能需要多个生产者。我看到的所有代码示例似乎都支持一个生产者,即在应用程序启动期间从应用程序读取配置。如果有多个producer,并且我们希望传入不同的producer配置,那么spring中是否有现成的支持?或者我应该不用Spring吗?

2j4z5cfb

2j4z5cfb1#

您必须创建两个不同的 ProducerFactory 下面是一个例子

import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;

    import java.util.HashMap;

    @Configuration
    public class KafkaProducerConfig {

        @Bean
        public ProducerFactory<String, String> confluentProducerFactory() {

            HashMap<String, Object> configProps = new HashMap<String, Object>();
            configProps.put(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    "localhost:9092");
            configProps.put(
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            configProps.put(
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }

        @Bean
        public ProducerFactory<String, String> cloudraProducerFactory() {

            HashMap<String, Object> configProps = new HashMap<String, Object>();
            configProps.put(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    "localhost:9094");
            configProps.put(
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            configProps.put(
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }

        @Bean(name = "confluent")
        public KafkaTemplate<String, String> confluentKafkaTemplate() {
            return new KafkaTemplate<>(confluentProducerFactory());
        }

        @Bean(name = "cloudera")
        public KafkaTemplate<String, String> clouderaKafkaTemplate() {
            return new KafkaTemplate<>(cloudraProducerFactory());
        }

    }

public class ProducerExample {

    @Autowired
    @Qualifier("cloudera")
    private KafkaTemplate clouderaKafkaTemplate;

    @Autowired
    @Qualifier("confluent")
    private KafkaTemplate confluentKafkaTemplate;

    public void send() {
        confluentKafkaTemplate.send("TestConfluent", "hey there..confluent");
        clouderaKafkaTemplate.send("TestCloudEra","hey there.. cloudera");
    }

}
kcugc4gi

kcugc4gi2#

您可以创建多个 Producer 示例( KafkaTemplate )通过相同的 ProducerFactory .
如果你需要不同的Kafka配置,你需要不同的 ProducerFactory 示例。

相关问题