webflux和reactor 3.4.0-不推荐的fluxprocessors-如何使用接收器订阅?

xam8gpfp  于 2021-07-03  发布在  Java
关注(0)|答案(0)|浏览(435)

在reactor 3.4.0中,不同的fluxProcessor(如“directprocessor”)越来越不受欢迎。我使用这种处理器作为订户,参见下面的示例。
现在我想知道如何迁移代码以使用推荐的 Sinks.many() 接近?有什么想法吗?
旧代码:

DirectProcessor<String> output = DirectProcessor.create();
output.subscribe(msg -> System.out.println(msg));

WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(uri, session -> 
    // send message
    session.send(Mono.just(session.textMessage(command)))
        .thenMany(session.receive()
        .map(message -> message.getPayloadAsText())
        .subscribeWith(output))
    .then()).block();

根据不推荐使用的directprocessor的javadoc,我应该使用它 Sinks.many().multicast().directBestEffort() . 但是我想知道如何在我的websocketclient中使用它?
迁移的代码:

Many<String> sink = Sinks.many().multicast().directBestEffort();        
Flux<String> flux = sink.asFlux();
flux.subscribe(msg -> System.out.println(msg));

WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(uri, session -> 
    // send message
    session.send(Mono.just(session.textMessage(command)))
        .thenMany(session.receive()
        .map(message -> message.getPayloadAsText())
        .subscribe ...   // <-- how to do this with a Sink ??
    .then()).block();

提前谢谢你的建议。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题