如何使用springcloudstream访问由密码保护的合流模式注册服务器?

r3i60tvu  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(321)

我在aiven的schema registry旁边使用springcloudstream,它使用confluent的schema registry。aiven的schema注册表由密码保护。根据这些指令,需要设置这两个配置参数才能成功访问schema registry服务器。

props.put("basic.auth.credentials.source", "USER_INFO");
 props.put("basic.auth.user.info", "avnadmin:schema-reg-password");

当我只使用vanilla java的kafka驱动程序时一切都很好,但是如果我使用spring cloud stream,我不知道如何注入这两个参数。目前,我正在 "basic.auth.user.info" 以及 "basic.auth.credentials.source" 低于 "spring.cloud.stream.kafka.binder.configuration"application.yml 文件。
这样做,我会 "401 Unauthorized" 在架构要注册的行上。
更新1:
根据alin的建议,我更新了schemaregistryclient的bean的配置方式,以便它能够感知ssl上下文。

@Bean
public SchemaRegistryClient schemaRegistryClient(
    @Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
  try {
    final KeyStore keyStore = KeyStore.getInstance("PKCS12");
    keyStore.load(new FileInputStream(
            new File("path/to/client.keystore.p12")),
        "secret".toCharArray());

    final KeyStore trustStore = KeyStore.getInstance("JKS");
    trustStore.load(new FileInputStream(
            new File("path/to/client.truststore.jks")),
        "secret".toCharArray());

    TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;

    SSLContext sslContext = SSLContextBuilder
        .create()
        .loadKeyMaterial(keyStore, "secret".toCharArray())
        .loadTrustMaterial(trustStore, acceptingTrustStrategy)
        .build();

    HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
    ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
        httpClient);
    ConfluentSchemaRegistryClient schemaRegistryClient = new ConfluentSchemaRegistryClient(
        new RestTemplate(requestFactory));
    schemaRegistryClient.setEndpoint(endpoint);
    return schemaRegistryClient;
  } catch (Exception ex) {
    ex.printStackTrace();
    return null;
  }
}

这有助于消除应用程序启动时的错误并注册模式。但是,每当应用程序想要将消息推送到kafka时,就会再次抛出一个新错误。最后,梅尔森的回答也解决了这个问题。

wyyhbhjk

wyyhbhjk1#

由于aiven使用ssl作为kafka安全协议,因此需要使用证书进行身份验证。
您可以通过本页了解它的工作原理。简而言之,您需要运行以下命令来生成证书并导入它们:

openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key
keytool -import -file ca.pem -alias CA -keystore client.truststore.jks

然后可以使用以下属性来使用证书:

spring.cloud.stream.kafka.streams.binder:
  configuration:
    security.protocol: SSL
    ssl.truststore.location: client.truststore.jks
    ssl.truststore.password: secret
    ssl.keystore.type: PKCS12
    ssl.keystore.location: client.keystore.p12
    ssl.keystore.password: secret
    ssl.key.password: secret
    key.serializer: org.apache.kafka.common.serialization.StringSerializer
    value.serializer: org.apache.kafka.common.serialization.StringSerializer
vhmi4jdf

vhmi4jdf2#

我遇到了与我所处的情况相同的问题,即连接到由aiven托管并由basic auth保护的安全模式注册表。为了使其正常工作,我必须配置以下属性:

spring.kafka.properties.schema.registry.url=https://***.aiven***.com:port
spring.kafka.properties.basic.auth.credentials.source=USER_INFO
spring.kafka.properties.basic.auth.user.info=username:password

我的活页夹的其他属性包括:

spring.cloud.stream.binders.input.type=kafka
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.brokers=https://***.aiven***.com:port <-- different from the before mentioned port
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=truststore.jks
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=PKCS12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=clientkeystore.p12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.key.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.streams.binder.autoCreateTopics=false

实际上,spring云流会将spring.kafka.properties.basic*添加到defaultkafkaconsumerfactory中,并将配置添加到kafkaconsumer中。在SpringKafka初始化过程中的某个时刻,将创建一个cachedschemaregistryclient,该客户端使用这些属性进行设置。此客户端包含一个名为configurerestservice的方法,该方法将检查属性Map是否包含“basic.auth.credentials.source”。当我们通过spring.kafka.properties提供时,它将找到这个属性,并在访问schema注册表的端点时负责创建适当的头。
希望你也能成功。
我使用的是springcloud版本greenwich.sr1、springbootstarter2.1.4.release、avro版本1.8.2和confluent.version5.2.1

eyh26e7m

eyh26e7m3#

绑定器配置仅处理已知的使用者和生产者属性。
可以在绑定级别设置任意属性。

spring.cloud.stream.kafka.binding.<binding>.consumer.configuration.basic.auth...

相关问题