我是kafka的新手,我想用SpringBoot将消息发布到ConfluentCloudCluster主题。
当我运行我的应用程序的主题创建成功,并能够看到它当我登录到我的汇合帐户。
但是SpringBoot记录了这个异常,我现在被困在这一点上了
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic livestreams1 not present in metadata after 60000 ms.
Have tried related questions but none has resolved it
这是我的代码>>application.properties
# Kafka
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.bootstrap.servers=pkc-l7j7w.asia-east1.gcp.confluent.cloud:9092
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='VG4WDA5V33J4MTT6' password='6I3RcFG9DGJsDuW09KgFWmeYmslB1A5S1DtQOaycuD8lDLipA6nNe6p2iFxq0GSH';
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.admin.client-id=lkc-67m0j
>>**Kafka Configuration class**
@Configuration
@NoArgsConstructor
public class KafkaConfiguration {
@Value("${spring.kafka.properties.bootstrap.servers}")
private String bootStrapServer;
@Bean
public ProducerFactory<Long , String> producerFactory(){
Properties properties = new Properties();
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , bootStrapServer);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , LongSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class);
return new DefaultKafkaProducerFactory(properties);
}
@Bean
public KafkaTemplate<Long , String> longStringKafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
@Bean
public NewTopic liveStreams1Topic(){
String TOPIC="livestreams1";
return new NewTopic(TOPIC , 6 , (short) 3);
}
}
制作人
@Component
@RequiredArgsConstructor
public class Producer {
@Autowired
private KafkaTemplate<Long , String> kafkaTemplate;
@EventListener(ApplicationStartedEvent.class)//this method will be invoked after application start
public void produce(){
String TOPIC="livestreams1";
for (long num = 1; num < 10; num++) {
String message = "Livestreams data " + num;
kafkaTemplate.send(TOPIC, num, message).addCallback(
new ListenableFutureCallback<SendResult<Long, String>>() {
@Override
public void onFailure(Throwable throwable) {
System.err.println("Not today");
}
@Override
public void onSuccess(SendResult<Long, String> result) {
if (result != null) {
int partition = result.getRecordMetadata().partition();
System.out.println("partition " + partition);
long offset = result.getRecordMetadata().offset();
System.out.println("offset " + offset);
}
}
});
kafkaTemplate.flush();
}
}
}
引导类
@SpringBootApplication
@EnableKafka
public class Livestreams1Application {
public static void main(String[] args) {
SpringApplication.run(Livestreams1Application.class, args);
}
}
日志
2020-11-19 11:09:33.354 WARN 12153 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker pkc-l7j7w.asia-east1.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected
2020-11-19 11:09:35.680 WARN 12153 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker pkc-l7j7w.asia-east1.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected
2020-11-19 11:09:37.757 WARN 12153 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker pkc-l7j7w.asia-east1.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected
2020-11-19 11:09:38.690 ERROR 12153 --- [ main] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='1' and payload='Livestreams data 1' to topic livestreams1:
org.apache.kafka.common.errors.TimeoutException: Topic livestreams1 not present in metadata after 60000 ms.
2020-11-19 11:09:38.696 INFO 12153 --- [ main] ConditionEvaluationReportLoggingListener :
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2020-11-19 11:09:38.718 ERROR 12153 --- [ main] o.s.boot.SpringApplication : Application run failed
org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic livestreams1 not present in metadata after 60000 ms.
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:574) ~[spring-kafka-2.6.3.jar:2.6.3]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:369) ~[spring-kafka-2.6.3.jar:2.6.3]
at com.wanfadger.kafka.livestreams1.producer.Producer.produce(Producer.java:26) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:312) ~[spring-context-5.3.1.jar:5.3.1]
at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:197) ~[spring-context-5.3.1.jar:5.3.1]
at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:160) ~[spring-context-5.3.1.jar:5.3.1]
at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:203) ~[spring-context-5.3.1.jar:5.3.1]
at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:196) ~[spring-context-5.3.1.jar:5.3.1]
at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:161) ~[spring-context-5.3.1.jar:5.3.1]
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:426) ~[spring-context-5.3.1.jar:5.3.1]
at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:383) ~[spring-context-5.3.1.jar:5.3.1]
at org.springframework.boot.context.event.EventPublishingRunListener.started(EventPublishingRunListener.java:105) ~[spring-boot-2.4.0.jar:2.4.0]
at org.springframework.boot.SpringApplicationRunListeners.lambda$started$5(SpringApplicationRunListeners.java:75) ~[spring-boot-2.4.0.jar:2.4.0]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1510) ~[na:na]
at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:117) ~[spring-boot-2.4.0.jar:2.4.0]
at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:111) ~[spring-boot-2.4.0.jar:2.4.0]
at org.springframework.boot.SpringApplicationRunListeners.started(SpringApplicationRunListeners.java:75) ~[spring-boot-2.4.0.jar:2.4.0]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:332) ~[spring-boot-2.4.0.jar:2.4.0]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1309) ~[spring-boot-2.4.0.jar:2.4.0]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1298) ~[spring-boot-2.4.0.jar:2.4.0]
at com.wanfadger.kafka.livestreams1.Livestreams1Application.main(Livestreams1Application.java:14) ~[classes/:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic livestreams1 not present in metadata after 60000 ms.
暂无答案!
目前还没有任何答案,快来回答吧!