topiclivestreams1在60000毫秒后元数据中不存在

zte4gxcn  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(259)

我是kafka的新手,我想用SpringBoot将消息发布到ConfluentCloudCluster主题。
当我运行我的应用程序的主题创建成功,并能够看到它当我登录到我的汇合帐户。
但是SpringBoot记录了这个异常,我现在被困在这一点上了

Caused by: org.apache.kafka.common.errors.TimeoutException: Topic livestreams1 not present in metadata after 60000 ms.
Have tried related questions but none has resolved it

这是我的代码>>application.properties


# Kafka

spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.bootstrap.servers=pkc-l7j7w.asia-east1.gcp.confluent.cloud:9092
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='VG4WDA5V33J4MTT6'   password='6I3RcFG9DGJsDuW09KgFWmeYmslB1A5S1DtQOaycuD8lDLipA6nNe6p2iFxq0GSH';
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.admin.client-id=lkc-67m0j

  >>**Kafka Configuration class**

    @Configuration
   @NoArgsConstructor
public class KafkaConfiguration {

    @Value("${spring.kafka.properties.bootstrap.servers}")
    private String bootStrapServer;

    @Bean
    public ProducerFactory<Long , String> producerFactory(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , bootStrapServer);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , LongSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class);
        return new DefaultKafkaProducerFactory(properties);
    }

    @Bean
    public KafkaTemplate<Long , String> longStringKafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public NewTopic liveStreams1Topic(){
        String TOPIC="livestreams1";
        return new NewTopic(TOPIC , 6 , (short) 3);
    }

}

制作人

@Component
@RequiredArgsConstructor
public class Producer {

    @Autowired
    private KafkaTemplate<Long   , String> kafkaTemplate;

    @EventListener(ApplicationStartedEvent.class)//this method will be invoked after application start
    public void produce(){
         String TOPIC="livestreams1";
        for (long num = 1; num < 10; num++) {
            String message = "Livestreams data " + num;
            kafkaTemplate.send(TOPIC, num, message).addCallback(
                    new ListenableFutureCallback<SendResult<Long, String>>() {
                        @Override
                        public void onFailure(Throwable throwable) {
                            System.err.println("Not today");
                        }

                        @Override
                        public void onSuccess(SendResult<Long, String> result) {
                            if (result != null) {
                                int partition = result.getRecordMetadata().partition();
                                System.out.println("partition " + partition);
                                long offset = result.getRecordMetadata().offset();
                                System.out.println("offset " + offset);
                            }
                        }
                    });
            kafkaTemplate.flush();
        }
    }

}

引导类

@SpringBootApplication
@EnableKafka
public class Livestreams1Application {

    public static void main(String[] args) {

        SpringApplication.run(Livestreams1Application.class, args);
    }

}

日志

2020-11-19 11:09:33.354  WARN 12153 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker pkc-l7j7w.asia-east1.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected
2020-11-19 11:09:35.680  WARN 12153 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker pkc-l7j7w.asia-east1.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected
2020-11-19 11:09:37.757  WARN 12153 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker pkc-l7j7w.asia-east1.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected
2020-11-19 11:09:38.690 ERROR 12153 --- [           main] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='1' and payload='Livestreams data 1' to topic livestreams1:

org.apache.kafka.common.errors.TimeoutException: Topic livestreams1 not present in metadata after 60000 ms.

2020-11-19 11:09:38.696  INFO 12153 --- [           main] ConditionEvaluationReportLoggingListener : 

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2020-11-19 11:09:38.718 ERROR 12153 --- [           main] o.s.boot.SpringApplication               : Application run failed

org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic livestreams1 not present in metadata after 60000 ms.
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:574) ~[spring-kafka-2.6.3.jar:2.6.3]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:369) ~[spring-kafka-2.6.3.jar:2.6.3]
    at com.wanfadger.kafka.livestreams1.producer.Producer.produce(Producer.java:26) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
    at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:312) ~[spring-context-5.3.1.jar:5.3.1]
    at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:197) ~[spring-context-5.3.1.jar:5.3.1]
    at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:160) ~[spring-context-5.3.1.jar:5.3.1]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:203) ~[spring-context-5.3.1.jar:5.3.1]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:196) ~[spring-context-5.3.1.jar:5.3.1]
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:161) ~[spring-context-5.3.1.jar:5.3.1]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:426) ~[spring-context-5.3.1.jar:5.3.1]
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:383) ~[spring-context-5.3.1.jar:5.3.1]
    at org.springframework.boot.context.event.EventPublishingRunListener.started(EventPublishingRunListener.java:105) ~[spring-boot-2.4.0.jar:2.4.0]
    at org.springframework.boot.SpringApplicationRunListeners.lambda$started$5(SpringApplicationRunListeners.java:75) ~[spring-boot-2.4.0.jar:2.4.0]
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1510) ~[na:na]
    at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:117) ~[spring-boot-2.4.0.jar:2.4.0]
    at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:111) ~[spring-boot-2.4.0.jar:2.4.0]
    at org.springframework.boot.SpringApplicationRunListeners.started(SpringApplicationRunListeners.java:75) ~[spring-boot-2.4.0.jar:2.4.0]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:332) ~[spring-boot-2.4.0.jar:2.4.0]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1309) ~[spring-boot-2.4.0.jar:2.4.0]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1298) ~[spring-boot-2.4.0.jar:2.4.0]
    at com.wanfadger.kafka.livestreams1.Livestreams1Application.main(Livestreams1Application.java:14) ~[classes/:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic livestreams1 not present in metadata after 60000 ms.

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题