java—在spring boot应用程序中实现React式kafka侦听器

vyswwuz2  于 2021-07-07  发布在  Java
关注(0)|答案(1)|浏览(483)

我正在尝试在我的spring boot应用程序中实现React式kafka consumer,我正在查看以下示例:https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/samplescenarios.java
看来Kafka还不支持Spring
我理解kafka监听器在spring的非React式kafkaapi中是如何工作的:最简单的解决方案是为concurrentkafkalistenercontainerfactory和consumerfactory配置bean,然后使用@kafkalistener annotation和voila
但我不知道如何正确使用ReactKafka在Spring现在。
基本上我需要一个听众的主题。我应该创建自己的循环或调度程序吗?或者我错过了什么。有人能分享他们的知识和最佳实践吗?

hkmswyz6

hkmswyz61#

我还没有现成的解决方案,但我正在尝试这个(kotlin代码,spring boot)。有人在这里发布了部分代码片段https://github.com/reactor/reactor-kafka/issues/100

@EventListener(ApplicationStartedEvent::class)
fun onSomeEvent() {
    kafkaReceiver
        .receive()
        .doOnNext { record ->
            val myEvent = record.value()
            processMyEvent(myEvent).thenEmpty {
                record.receiverOffset().acknowledge()
            }
        }
        .doOnError {
            /* todo */
        }
        .subscribe()
}

查看其他堆栈溢出问题。那里不多,但也许会给你一些想法
使用onerrorresume处理使用React堆kafka发布到kafka的有问题的有效负载
反序列化异常后继续使用reactor kafka中的后续记录

相关问题