我有一个用例,它需要我有多个Kafka Producer(基于配置)。也就是说,如果我的配置有3个想要接收数据的租户,我想启动3个生产者(所有三个写入3个不同的集群)。
我尝试将我的Kafka配置设置为:
@Bean
@Primary
public Map<String, KafkaTemplate<String, String>> kafkaTemplates() {
log.info("setting up kafka templates");
final String ids = configuration.getIds();
Map<String, KafkaTemplate<String, String>> kafkaTemplates = new HashMap<>();
for (String id : ids.split(",")) {
kafkaTemplates.put(id, kafkaTemplate(id));
}
return kafkaTemplates;
}
private KafkaTemplate<String, String> kafkaTemplate(String id) {
log.info("setting up kafka template");
try {
Properties producerProperties = new Properties();
try (InputStream kins = Files.newInputStream(new File("/opt/user-secrets/", id + ".properties").toPath())) {
producerProperties.load(kins);
} catch (IOException e) {
throw new RuntimeException(e);
}
Map props = producerProperties;
Map<String, Object> props1 = (Map<String, Object>) props;
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props1);
return new KafkaTemplate<>(producerFactory);
} catch (Exception e) {
log.error("e {}", e.getMessage(), e);
throw new RuntimeException(e);
}
}
字符串
当我尝试在Producer服务中访问KafkaTemplate时,这导致了kafkaTemplates.get(id)
上的NPE:
@Service
@Slf4j
@Getter
@Setter
public class KafkaProducerService {
private final Map<String, KafkaTemplate<String, String>> kafkaTemplates;
private Schema avroSchema;
public KafkaProducerService(Map<String, KafkaTemplate<String, String>> kafkaTemplates) {
this.kafkaTemplates = kafkaTemplates;
}
public void produce(String id, String topic, String key, VehicleHeartbeat message) {
GenericRecord record = generateAvroRecord(message);
log.info("generic record: {}", record);
kafkaTemplates.get(id).send(topic, key, record.toString());
}
型
我该如何处理Spring的Kafka?
1条答案
按热度按时间vulvrdjw1#
可能与其他
Map
bean冲突。你可以尝试在
kafkaTemplates
方法上替换@Bean
,如下所示:字符串
然后在服务构造函数中,你用相同的名字来限定它,这样你就可以确保注入正确的bean:
型