使用springboot连接到多个kafka服务器

kninwzqo  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(861)

在spring引导应用程序中,我想同时连接到两个不同的kafka服务器。我正在使用kafkaadmin和adminclient建立连接并执行crud操作。

@Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();

        String krb5location = krb5Location;
        System.setProperty("java.security.krb5.conf", krb5location);
        System.setProperty("java.security.auth.login.config", jaasConfigLocation);

        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        configs.put("security.protocol", "SASL_SSL");
        configs.put("ssl.truststore.location", sslTruststoreLocation);
        configs.put("ssl.truststore.password", sslTruststorePassowrd);

        return new KafkaAdmin(configs);
    }

    @Bean
    @PostConstruct
    public AdminClient config() {

        return AdminClient.create(kafkaAdmin.getConfig());

    }

类似地,服务器2在同一个springboot应用程序中配置。
如果我在应用程序初始化期间一次加载两个kafka服务器的配置,将显示以下错误

>>>KRBError:
     cTime is Sun Jun 03 14:23:02 IST 2001 991558382000
     sTime is Tue Nov 20 10:46:53 IST 2018 1542691013000
     suSec is 512097
     error code is 7
     error Message is Server not found in Kerberos database
     cname is config1@servername.com
     sname is config2@servernname.com
     msgType is 30
    at sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:73)
    at sun.security.krb5.KrbTgsReq.getReply(KrbTgsReq.java:251)
    at sun.security.krb5.KrbTgsReq.sendAndGetCreds(KrbTgsReq.java:262)
    at sun.security.krb5.internal.CredentialsUtil.serviceCreds(CredentialsUtil.java:308)
    at sun.security.krb5.internal.CredentialsUtil.acquireServiceCreds(CredentialsUtil.java:126)
    at sun.security.krb5.Credentials.acquireServiceCreds(Credentials.java:458)
    at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:693)
    at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:248)
    at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
    at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:361)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:359)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:359)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:269)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:206)
    at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:81)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:474)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1006)
    at java.lang.Thread.run(Thread.java:748)
Caused by: KrbException: Identifier doesn't match expected value (906)
    at sun.security.krb5.internal.KDCRep.init(KDCRep.java:140)
    at sun.security.krb5.internal.TGSRep.init(TGSRep.java:65)
    at sun.security.krb5.internal.TGSRep.<init>(TGSRep.java:60)
    at sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:55)
    ... 22 more
2018-11-20 10:46:53.605 ERROR 8672 --- [| adminclient-4] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-4] Connection to node -1 failed authentication due to: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)]) occurred when evaluating SASL token received from the Kafka Broker. This may be caused by Java's being unable to resolve the Kafka Broker's hostname correctly. You may want to try to adding '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment. Users must configure FQDN of kafka brokers when authenticating using SASL and `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm` Kafka Client will go to AUTHENTICATION_FAILED state.

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题