如何在webflux应用程序中使用spring云流?

icomxhvb  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(432)

我有一个基于webflux的微服务,它有一个简单的React式存储库:

public interface NotificationRepository extends ReactiveMongoRepository<Notification, ObjectId> {
    }

现在我想扩展这个微服务来使用来自kafka的事件消息。然后,此消息/事件将保存到数据库中。
对于Kafka的听众,我使用了spring cloud stream。我创建了一些简单的consumer,它工作得很好-我能够使用消息并将其保存到数据库中。

@Bean
    public Consumer<KStream<String, Event>> documents(NotificationRepository repository) {
        return input ->
                input.foreach((key, value) -> {
                    LOG.info("Received event, Key: {}, value: {}", key, value);
                    repository.save(initNotification(value)).subscribe();
                });
    }

但这是连接springcloudstream消费者和React式存储库的正确方法吗?我要打电话的时候好像不是这样 subscribe() 最后。
我阅读了springcloudstream文档(3.0.0版本),他们说

Native support for reactive programming - since v3.0.0 we no longer distribute spring-cloud-stream-reactive modules and instead relying on native reactive support provided by spring cloud function. For backward compatibility you can still bring spring-cloud-stream-reactive from previous versions.

在这个演示视频中,他们还提到了使用projectreactor的React式编程支持。所以我想有一种方法我就是不知道。你能告诉我怎么做对吗?
我很抱歉,如果这一切听起来太愚蠢,但我对springcloudstream和React式编程非常陌生,还没有找到很多文章来描述这一点。

23c0lvtd

23c0lvtd1#

只需使用通量作为消耗类型,如下所示:

@Bean
public Consumer<Flux<Message<Event>>> documents(NotificationRepository repository) {
    return input ->
            input
             .map(message-> /*map the necessary value like:*/ message.getPayload().getEventValue())
             .concatMap((value) -> repository.save(initNotification(value)))
             .subscribe();
}

如果你使用 Function 返回类型为空( Function<Flux<Message<Event>>, Mono<Void>> )框架可以自动订阅,而不是使用者。与 Consumer 您必须手动订阅,因为框架没有对流的引用。但是在 Consumer 如果您订阅的不是存储库,而是整个流,这是可以的。

相关问题