我在aiven的schema registry旁边使用springcloudstream,它使用confluent的schema registry。aiven的schema注册表由密码保护。根据这些指令,需要设置这两个配置参数才能成功访问schema registry服务器。
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "avnadmin:schema-reg-password");
当我只使用vanilla java的kafka驱动程序时一切都很好,但是如果我使用spring cloud stream,我不知道如何注入这两个参数。目前,我正在 "basic.auth.user.info"
以及 "basic.auth.credentials.source"
低于 "spring.cloud.stream.kafka.binder.configuration"
在 application.yml
文件。
这样做,我会 "401 Unauthorized"
在架构要注册的行上。
更新1:
根据alin的建议,我更新了schemaregistryclient的bean的配置方式,以便它能够感知ssl上下文。
@Bean
public SchemaRegistryClient schemaRegistryClient(
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
try {
final KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(new FileInputStream(
new File("path/to/client.keystore.p12")),
"secret".toCharArray());
final KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(new FileInputStream(
new File("path/to/client.truststore.jks")),
"secret".toCharArray());
TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;
SSLContext sslContext = SSLContextBuilder
.create()
.loadKeyMaterial(keyStore, "secret".toCharArray())
.loadTrustMaterial(trustStore, acceptingTrustStrategy)
.build();
HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
httpClient);
ConfluentSchemaRegistryClient schemaRegistryClient = new ConfluentSchemaRegistryClient(
new RestTemplate(requestFactory));
schemaRegistryClient.setEndpoint(endpoint);
return schemaRegistryClient;
} catch (Exception ex) {
ex.printStackTrace();
return null;
}
}
这有助于消除应用程序启动时的错误并注册模式。但是,每当应用程序想要将消息推送到kafka时,就会再次抛出一个新错误。最后,梅尔森的回答也解决了这个问题。
3条答案
按热度按时间wyyhbhjk1#
由于aiven使用ssl作为kafka安全协议,因此需要使用证书进行身份验证。
您可以通过本页了解它的工作原理。简而言之,您需要运行以下命令来生成证书并导入它们:
然后可以使用以下属性来使用证书:
vhmi4jdf2#
我遇到了与我所处的情况相同的问题,即连接到由aiven托管并由basic auth保护的安全模式注册表。为了使其正常工作,我必须配置以下属性:
我的活页夹的其他属性包括:
实际上,spring云流会将spring.kafka.properties.basic*添加到defaultkafkaconsumerfactory中,并将配置添加到kafkaconsumer中。在SpringKafka初始化过程中的某个时刻,将创建一个cachedschemaregistryclient,该客户端使用这些属性进行设置。此客户端包含一个名为configurerestservice的方法,该方法将检查属性Map是否包含“basic.auth.credentials.source”。当我们通过spring.kafka.properties提供时,它将找到这个属性,并在访问schema注册表的端点时负责创建适当的头。
希望你也能成功。
我使用的是springcloud版本greenwich.sr1、springbootstarter2.1.4.release、avro版本1.8.2和confluent.version5.2.1
eyh26e7m3#
绑定器配置仅处理已知的使用者和生产者属性。
可以在绑定级别设置任意属性。