我在我的项目中使用SpringBootV2.2.4和ApacheKafka。
下面是我的 pom.xml
文件:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.200</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java</artifactId>
<version>3.1.5</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- <dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-annotations</artifactId>
<version>3.5.6-Final</version>
</dependency>
-->
</dependencies>
下面是我作为Kafka一部分的代码
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String,ServingDetailsEntity> producerFactoryServingDetail(){
Map<String,Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory(config);
}
@Bean
public ProducerFactory<String,String> producerFactory(){
Map<String,Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory(config);
}
@Bean
public KafkaTemplate<String, ServingDetailsEntity> kafkaTemplateItem(){
return new KafkaTemplate<String, ServingDetailsEntity>(producerFactoryServingDetail());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<String, String>(producerFactory());
}
}
但当json消息被发送到kafka队列时,我得到了下面的错误
java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.Producer.close(Ljava/time/Duration;)V
at org.springframework.kafka.core.KafkaTemplate.closeProducer(KafkaTemplate.java:382) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$4(KafkaTemplate.java:433) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:198) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:570) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:550) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:474) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:75) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:660) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:454) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:446) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224) ~[kafka-clients-0.11.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) ~[kafka-clients-0.11.0.0.jar:na]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_201]
不过,json消息正在到达队列,但我想了解为什么会出现上述错误
期待任何帮助
2条答案
按热度按时间zfciruhq1#
您使用的是旧版本的
kafka-clients
. 尝试使用较新的:axzmvihb2#
是的,正如gary提到的,如果您使用的是springboot,就让它来管理兼容的版本。
你也可以在这里找到兼容性矩阵https://spring.io/projects/spring-kafka