在Kafka Spring Boot中重用AdminClient Bean

gopyfrb3  于 2022-09-21  发布在  Kafka
关注(0)|答案(1)|浏览(249)

要使用端口9094上的管理客户端连接到Kafka,我有以下两个主要类,其中包括以下应用程序.yml:

@EnableKafka
@SpringBootApplication
@ConfigurationPropertiesScan
public class KafkaSpringApiApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaSpringApiApplication.class, args);
    }
    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

}
package org.kafka.config;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.kafka.reader.Kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Component
public class AdminConfigurer {

    @Autowired
    private Kafka kafkaConfig;

    @Bean
    public Map<String, Object> kafkaAdminProperties() {
        final Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
        if(kafkaConfig.getProperties().getSasl().getEnabled() && kafkaConfig.getSsl().getEnabled()) {
            configs.put("sasl.mechanism", kafkaConfig.getProperties().getSasl().getMechanism());
            configs.put("security.protocol", kafkaConfig.getProperties().getSasl().getSecurity().getProtocol());
            configs.put("ssl.keystore.location", kafkaConfig.getSsl().getKeystoreLocation());
            configs.put("ssl.keystore.password", kafkaConfig.getSsl().getKeystorePassword());
            configs.put("ssl.truststore.location", kafkaConfig.getSsl().getTruststoreLocation());
            configs.put("ssl.truststore.password", kafkaConfig.getSsl().getTruststorePassword());
            configs.put("sasl.jaas.config", String.format(kafkaConfig.getJaasTemplate(),
                    kafkaConfig.getProperties().getSasl().getJaas().getConfig().getUsername(),
                    kafkaConfig.getProperties().getSasl().getJaas().getConfig().getPassword()));
            configs.put("ssl.endpoint.identification.algorithm", "");
        }
        return configs;
    }

    @Bean
    public AdminClient getClient() {
        return AdminClient.create(kafkaAdminProperties());
    }

}

Application.yml

kafka:
  bootstrap-servers: localhost:9094
  properties:
    sasl:
      enabled: false
      jaas:
        config:
          username: XXX
          password: XXX
      mechanism: SCRAM-SHA-256
      security:
        protocol: SASL_SSL
  ssl:
    enabled: false
    truststore-location: /opt/app/secrets/kafka.consumer.truststore.jks
    truststore-password: XXX
    keystore-location: /opt/app/secrets/kafka.consumer.keystore.jks
    keystore-password: XXX
    key-password: XXX

服务等级

@Service
public class TopicService {

  private static final Logger LOG = LoggerFactory.getLogger(TopicService.class);
  @Autowired
  private AdminClient adminClient;

public void createTopic(Topic topic) throws ExecutionException, InterruptedException {
    adminClient
            .createTopics(Collections.singletonList(ServiceHelper.fromTopic(topic)))
            .values()
            .get(topic.getName())
            .get();
  }
}

我在日志中看到的问题是最初它使用管理客户端配置并正确连接到端口9094上的Kafka,但当我尝试创建主题时,它使用的是端口9092而不是

原木

.   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _    
( ( )___ | '_ | '_| | '_ / _` |    
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |___, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.7.1)

2022-09-07 15:40:20.144  INFO 71648 --- [  restartedMain] org.kafka.KafkaSpringApiApplication      : Starting KafkaSpringApiApplication using Java 18.0.2.1 on DESKTOP-JT3FAGV with PID 71648 (C:UsersutkarshDesktopgitprojectskafka-spring-apitargetclasses started by utkarsh in C:UsersutkarshDesktopgitprojectskafka-spring-api)
2022-09-07 15:40:20.145  INFO 71648 --- [  restartedMain] org.kafka.KafkaSpringApiApplication      : No active profile set, falling back to 1 default profile: "default"
2022-09-07 15:40:20.245  INFO 71648 --- [  restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2022-09-07 15:40:20.245  INFO 71648 --- [  restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : For additional web related logging consider setting the 'logging.level.web' property to 'DEBUG'
2022-09-07 15:40:21.593  INFO 71648 --- [  restartedMain] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data JPA repositories in DEFAULT mode.
2022-09-07 15:40:21.675  INFO 71648 --- [  restartedMain] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 71 ms. Found 1 JPA repository interfaces.
2022-09-07 15:40:22.660  INFO 71648 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)
2022-09-07 15:40:22.673  INFO 71648 --- [  restartedMain] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2022-09-07 15:40:22.674  INFO 71648 --- [  restartedMain] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.64]
2022-09-07 15:40:22.860  INFO 71648 --- [  restartedMain] o.a.c.c.C.[Tomcat].[localhost].[/kafka]  : Initializing Spring embedded WebApplicationContext
2022-09-07 15:40:22.860  INFO 71648 --- [  restartedMain] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 2613 ms
2022-09-07 15:40:23.359  INFO 71648 --- [  restartedMain] o.hibernate.jpa.internal.util.LogHelper  : HHH000204: Processing PersistenceUnitInfo [name: default]
2022-09-07 15:40:23.413  INFO 71648 --- [  restartedMain] org.hibernate.Version                    : HHH000412: Hibernate ORM core version 5.6.9.Final
2022-09-07 15:40:23.621  INFO 71648 --- [  restartedMain] o.hibernate.annotations.common.Version   : HCANN000001: Hibernate Commons Annotations {5.1.2.Final}
2022-09-07 15:40:23.798  INFO 71648 --- [  restartedMain] org.hibernate.dialect.Dialect            : HHH000400: Using dialect: org.hibernate.dialect.PostgreSQL10Dialect
2022-09-07 15:40:24.398  INFO 71648 --- [  restartedMain] o.h.e.t.j.p.i.JtaPlatformInitiator       : HHH000490: Using JtaPlatform implementation: [org.hibernate.engine.transaction.jta.platform.internal.NoJtaPlatform]
2022-09-07 15:40:24.407  INFO 71648 --- [  restartedMain] j.LocalContainerEntityManagerFactoryBean : Initialized JPA EntityManagerFactory for persistence unit 'default'
2022-09-07 15:40:33.166  INFO 71648 --- [  restartedMain] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
    bootstrap.servers = [localhost:9094]
    client.dns.lookup = use_all_dns_ips
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

2022-09-07 15:40:47.487  INFO 71648 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.1
2022-09-07 15:40:47.489  INFO 71648 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 97671528ba54a138
2022-09-07 15:40:47.489  INFO 71648 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1662545447486
---
---
2022-09-07 15:40:50.300  INFO 71648 --- [  restartedMain] o.s.s.web.DefaultSecurityFilterChain     : Will secure any request with [org.springframework.security.web.session.DisableEncodeUrlFilter@265d12d3, org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter@14d64eab, org.springframework.security.web.context.SecurityContextPersistenceFilter@1f2465a0, org.springframework.security.web.header.HeaderWriterFilter@2502a969, org.springframework.security.web.authentication.logout.LogoutFilter@6d4146f2, org.springframework.security.web.savedrequest.RequestCacheAwareFilter@4e059749, org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter@41c12f62, org.springframework.security.web.authentication.AnonymousAuthenticationFilter@eb03b, org.springframework.security.web.session.SessionManagementFilter@7e0bcff7, org.springframework.security.web.access.ExceptionTranslationFilter@19366e29, org.springframework.security.web.access.intercept.FilterSecurityInterceptor@234a3846]
2022-09-07 15:40:50.386  INFO 71648 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2022-09-07 15:40:50.429  INFO 71648 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path '/kafka'
2022-09-07 15:40:50.449  INFO 71648 --- [  restartedMain] org.kafka.KafkaSpringApiApplication      : Started KafkaSpringApiApplication in 31.049 seconds (JVM running for 32.141)
2022-09-07 15:40:56.292  INFO 71648 --- [nio-8080-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/kafka]  : Initializing Spring DispatcherServlet 'dispatcherServlet'
2022-09-07 15:40:56.292  INFO 71648 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2022-09-07 15:40:56.293  INFO 71648 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2022-09-07 15:41:05.008  INFO 71648 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node 1 disconnected.
2022-09-07 15:41:05.009  WARN 71648 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-09-07 15:41:05.055  INFO 71648 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node 1 disconnected.
2022-09-07 15:41:05.056  WARN 71648 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-09-07 15:41:05.165  INFO 71648 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node 1 disconnected.
2022-09-07 15:41:05.165  WARN 71648 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2022-09-07 15:41:05.413  INFO 71648 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node 1 disconnected.

我不明白为什么它不再使用Bean来执行管理任务。

sqxo8psd

sqxo8psd1#

在您的Kafka服务器属性上,确保已在正确的端口上定义了通告的监听程序

listeners=SASL_SSL://:9094
advertised.listeners=SASL_SSL://localhost:9094

https://www.confluent.io/blog/kafka-listeners-explained/

相关问题