在reactor 3.4.0中,不同的fluxProcessor(如“directprocessor”)越来越不受欢迎。我使用这种处理器作为订户,参见下面的示例。
现在我想知道如何迁移代码以使用推荐的 Sinks.many()
接近?有什么想法吗?
旧代码:
DirectProcessor<String> output = DirectProcessor.create();
output.subscribe(msg -> System.out.println(msg));
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(uri, session ->
// send message
session.send(Mono.just(session.textMessage(command)))
.thenMany(session.receive()
.map(message -> message.getPayloadAsText())
.subscribeWith(output))
.then()).block();
根据不推荐使用的directprocessor的javadoc,我应该使用它 Sinks.many().multicast().directBestEffort()
. 但是我想知道如何在我的websocketclient中使用它?
迁移的代码:
Many<String> sink = Sinks.many().multicast().directBestEffort();
Flux<String> flux = sink.asFlux();
flux.subscribe(msg -> System.out.println(msg));
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(uri, session ->
// send message
session.send(Mono.just(session.textMessage(command)))
.thenMany(session.receive()
.map(message -> message.getPayloadAsText())
.subscribe ... // <-- how to do this with a Sink ??
.then()).block();
提前谢谢你的建议。
暂无答案!
目前还没有任何答案,快来回答吧!