我有一个传出的消息流。它们可以任意间隔出现。如果在最后一条消息发送后的一段时间内没有消息,我想发出一条新消息,作为keep-alive或heartbeat。
下面是我尝试的代码示例。假设我想在“c”到“d”之间每隔1s发出一条心跳消息。
Flux.concat(
Flux.just("A", "B", "C").delayElements(Duration.ofMillis(500)),
Flux.just("D").delaySequence(Duration.ofSeconds(5))
)
.windowTimeout(1, Duration.ofSeconds(1))
.flatMap(window -> window.switchIfEmpty(Mono.just("*")))
.log()
.blockLast();
这是输出
14:30:14.659 [parallel-2] INFO reactor.Flux.FlatMap.1 - onNext(A)
14:30:15.162 [parallel-3] INFO reactor.Flux.FlatMap.1 - onNext(B)
14:30:15.663 [parallel-4] INFO reactor.Flux.FlatMap.1 - onNext(C)
14:30:16.664 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:17.665 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:18.664 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:19.670 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:20.665 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:20.676 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(D)
14:30:20.677 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*) // Why?
14:30:20.679 [parallel-1] INFO reactor.Flux.FlatMap.1 - onComplete()
在这个例子中,d跟在c后面5.013秒,即使我指定了5秒,所以我不介意中间是否有4到5个项目/心跳。不需要那么精确。
但是为什么在d之后还有一项被省略了呢?有办法解决吗?也许我用错了手术。
我想我可以用一个处理器来实现它,但是文档上说
大多数时候,你应该尽量避免使用处理器。
暂无答案!
目前还没有任何答案,快来回答吧!