我试图创造一个简单的Kafka制片人。因为我对这个主题不熟悉,所以我遵循了一个教程。我按照视频中的建议创建了一个配置文件。这是我正在使用的配置文件。
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, KafkaProducerModel> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, KafkaProducerModel> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
当我尝试运行我的应用程序时,我遇到了以下错误。
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerGroupMetadata
at jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[?:?]
at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[?:?]
at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?]
at java.lang.Class.privateGetDeclaredMethods(Class.java:3166) ~[?:?]
at java.lang.Class.getDeclaredMethods(Class.java:2309) ~[?:?]
at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:463) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.util.ReflectionUtils.doWithLocalMethods(ReflectionUtils.java:321) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.orm.jpa.support.PersistenceAnnotationBeanPostProcessor.buildPersistenceMetadata(PersistenceAnnotationBeanPostProcessor.java:438) ~[spring-orm-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.orm.jpa.support.PersistenceAnnotationBeanPostProcessor.findPersistenceMetadata(PersistenceAnnotationBeanPostProcessor.java:409) ~[spring-orm-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.orm.jpa.support.PersistenceAnnotationBeanPostProcessor.postProcessMergedBeanDefinition(PersistenceAnnotationBeanPostProcessor.java:336) ~[spring-orm-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyMergedBeanDefinitionPostProcessors(AbstractAutowireCapableBeanFactory.java:1094) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:569) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 15 more
这是一个gradle项目,这里是依赖关系
implementation 'org.springframework.kafka:spring-kafka:2.5.6.RELEASE'
我在查看了stackoverflow线程之后添加了依赖项。但是它没有解决这个错误。任何线索都会有帮助。
implementation 'org.apache.tomcat.embed:tomcat-embed-core'
1条答案
按热度按时间e3bfsja21#
你似乎有错误的版本
kafka-clients
在类路径上。SpringKafka 2.5.6使用kafka客户端2.5.1。您不应该自己声明版本,SpringBoot将引入正确的版本。SpringBoot2.3.4将引入这个版本的SpringKafka及其依赖项。如果您使用的是旧的引导,则需要覆盖所有依赖项。