我有一个springboot应用程序,它通过rest接收数据,基于一些业务逻辑,我需要将数据转发到两个不同的kafka集群,它们有自己的kerberos密钥和jaas文件。
我在两个不同的对象示例中编写了两个不同的producer示例,它们具有以下属性。
@Service
public class EventProducer {
private Logger logger = LoggerFactory.getLogger(EventProducer.class);
Producer<String, String> kafkaProducer = null;
@Autowired
public Producer<String, String> createProducer() {
if (kafkaProducer == null) {
Properties props = getKafkaConfig();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Cluste_1_hostaddress:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG,"usertest");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1600);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "true");
System.setProperty("java.security.auth.login.config", "/home/user/clusrter_1_jaas.conf);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("kafka.cluster.SecurityProtocol",PLAINTEXTSASL);
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.kerberos", "sasl.kerberos.service.namekafka");
props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
props.put("sasl.enabled.mechanisms", "PLAIN");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaProducer = new KafkaProducer<String, String>(props);
}
return kafkaProducer;
}
}
第二生产商
@Service
public class MovementProducer {
private Logger logger = LoggerFactory.getLogger(MovementProducer.class);
Producer<String, String> kafkaProducer = null;
@Autowired
public Producer<String, String> createProducer() {
if (kafkaProducer == null) {
Properties props = getKafkaConfig();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Cluste_2_hostaddress:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG,"usertest");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1600);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "true");
System.setProperty("java.security.auth.login.config", "/home/user/clusrter_2_jaas.conf);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("kafka.cluster.SecurityProtocol",PLAINTEXTSASL);
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.kerberos", "sasl.kerberos.service.namekafka");
props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
props.put("sasl.enabled.mechanisms", "PLAIN");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
kafkaProducer = new KafkaProducer<String, String>(props);
}
return kafkaProducer;
}
}
当我以两个服务的形式启动它时,只启用生产者示例,但当我在一个jar中同时启用两个示例时,只有一个生产者工作,而另一个生产者会遇到身份验证问题。
我觉得这是由于system.setproperty(“java.security.auth.login.config”,“”),因为它是全局系统变量,所以当我在单个进程中同时使用这两个变量时,它会覆盖,所以只有一个可以工作。
那么除了启动两个进程之外,还有什么办法解决这个问题呢。我只有一个 Spring 服务,应该能够产生两个不同的Kafka集群。。
暂无答案!
目前还没有任何答案,快来回答吧!