我有一个应用程序,可能需要多个生产者。我看到的所有代码示例似乎都支持一个生产者,即在应用程序启动期间从应用程序读取配置。如果有多个producer,并且我们希望传入不同的producer配置,那么spring中是否有现成的支持?或者我应该不用Spring吗?
2j4z5cfb1#
您必须创建两个不同的 ProducerFactory 下面是一个例子
ProducerFactory
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; @Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> confluentProducerFactory() { HashMap<String, Object> configProps = new HashMap<String, Object>(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public ProducerFactory<String, String> cloudraProducerFactory() { HashMap<String, Object> configProps = new HashMap<String, Object>(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094"); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean(name = "confluent") public KafkaTemplate<String, String> confluentKafkaTemplate() { return new KafkaTemplate<>(confluentProducerFactory()); } @Bean(name = "cloudera") public KafkaTemplate<String, String> clouderaKafkaTemplate() { return new KafkaTemplate<>(cloudraProducerFactory()); } } public class ProducerExample { @Autowired @Qualifier("cloudera") private KafkaTemplate clouderaKafkaTemplate; @Autowired @Qualifier("confluent") private KafkaTemplate confluentKafkaTemplate; public void send() { confluentKafkaTemplate.send("TestConfluent", "hey there..confluent"); clouderaKafkaTemplate.send("TestCloudEra","hey there.. cloudera"); } }
kcugc4gi2#
您可以创建多个 Producer 示例( KafkaTemplate )通过相同的 ProducerFactory .如果你需要不同的Kafka配置,你需要不同的 ProducerFactory 示例。
Producer
KafkaTemplate
2条答案
按热度按时间2j4z5cfb1#
您必须创建两个不同的
ProducerFactory
下面是一个例子kcugc4gi2#
您可以创建多个
Producer
示例(KafkaTemplate
)通过相同的ProducerFactory
.如果你需要不同的Kafka配置,你需要不同的
ProducerFactory
示例。