使用reactor-kafka 1.2.2.RELEASE运行Spring 3.1.0-M2会在错误时产生以下结果。哪个版本与3.1.0-M2兼容?
我正在尝试创建一个React式应用程序,它使用Kafka并使用R2 DBC写入MS SQL数据库。
2023-04-03T08:28:19.934-06:00 ERROR 22144 --- [event-tracker-1] reactor.core.scheduler.Schedulers : KafkaScheduler worker in group main failed with an uncaught exception
java.lang.NoSuchMethodError: 'void org.apache.kafka.clients.consumer.Consumer.close(long, java.util.concurrent.TimeUnit)'
at reactor.kafka.receiver.internals.DefaultKafkaReceiver$CloseEvent.run(DefaultKafkaReceiver.java:685) ~[reactor-kafka-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at reactor.kafka.receiver.internals.DefaultKafkaReceiver.doEvent(DefaultKafkaReceiver.java:401) ~[reactor-kafka-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$start$14(DefaultKafkaReceiver.java:335) ~[reactor-kafka-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.5.4.jar:3.5.4]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440) ~[reactor-core-3.5.4.jar:3.5.4]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527) ~[reactor-core-3.5.4.jar:3.5.4]
at reactor.kafka.receiver.internals.KafkaSchedulers$EventScheduler.lambda$decorate$1(KafkaSchedulers.java:100) ~[reactor-kafka-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) ~[reactor-core-3.5.4.jar:3.5.4]
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) ~[reactor-core-3.5.4.jar:3.5.4]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
来自pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0-M2</version>
<relativePath/>
</parent>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-mssql</artifactId>
<version>1.0.0.RELEASE</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
</dependencies>
下面是消费者服务的一个片段。我收到一条记录,检查它是否有效,然后检查它是否已经存在于DB中,最后,保存记录。我已经尝试了几个onError配置。上面的异常似乎优先,所以错误没有被捕获。
@Transactional
Flux<EventInterface> consumeEventDTO() {
return reactiveKafkaConsumerTemplate
.receiveAutoAck()
.delayElements(Duration.ofSeconds(2L)) // BACKPRESSURE
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.map(ConsumerRecord::value)
.flatMap(this::exists)
.flatMap(this::save)
.onErrorComplete(UncategorizedR2dbcException.class)
.doOnNext(event -> {
log.info("successfully consumed {}={}", EventInterface.class.getSimpleName(), event);
})
.doOnError(throwable -> log.error("something bad happened while consuming : {}", throwable.getMessage()));
}
1条答案
按热度按时间mwecs4sa1#
There is no spring-kafka for Spring 3.1.0-M* yet
reactor-kafka:1.2.2.RELEASE
已经有好几年了。spring-kafka:3.0.5
应该使用reactor-kafka:1.3.17
,并且还带来了Kafka客户端+流版本3.3.2
...您看到的错误与以下事实有关:您有冲突的
kafka-clients
版本...reactor-kafka:1.2.2.RELEASE
使用kafka-clients:2.0.0
,而不是您的Spring starter可能正在使用的最新版本此外,Sping Boot Parent
-M*
版本并不“稳定”;它们是发布前的里程碑。在这篇文章中,最新的稳定版本是
3.0.5
,你可以在docs page上找到它(它与上面提到的spring-kafka
版本相匹配)如果所有这些依赖项匹配都令人困惑,那么
vertx-kafka-client
是一个更简单的依赖项树https://vertx.io/docs/vertx-kafka-client/java/