java—如果与流中的上一个项有间隔,则发出一个新项

vxqlmq5t  于 2021-06-30  发布在  Java
关注(0)|答案(0)|浏览(231)

我有一个传出的消息流。它们可以任意间隔出现。如果在最后一条消息发送后的一段时间内没有消息,我想发出一条新消息,作为keep-alive或heartbeat。

下面是我尝试的代码示例。假设我想在“c”到“d”之间每隔1s发出一条心跳消息。

  1. Flux.concat(
  2. Flux.just("A", "B", "C").delayElements(Duration.ofMillis(500)),
  3. Flux.just("D").delaySequence(Duration.ofSeconds(5))
  4. )
  5. .windowTimeout(1, Duration.ofSeconds(1))
  6. .flatMap(window -> window.switchIfEmpty(Mono.just("*")))
  7. .log()
  8. .blockLast();

这是输出

  1. 14:30:14.659 [parallel-2] INFO reactor.Flux.FlatMap.1 - onNext(A)
  2. 14:30:15.162 [parallel-3] INFO reactor.Flux.FlatMap.1 - onNext(B)
  3. 14:30:15.663 [parallel-4] INFO reactor.Flux.FlatMap.1 - onNext(C)
  4. 14:30:16.664 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
  5. 14:30:17.665 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
  6. 14:30:18.664 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
  7. 14:30:19.670 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
  8. 14:30:20.665 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
  9. 14:30:20.676 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(D)
  10. 14:30:20.677 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*) // Why?
  11. 14:30:20.679 [parallel-1] INFO reactor.Flux.FlatMap.1 - onComplete()

在这个例子中,d跟在c后面5.013秒,即使我指定了5秒,所以我不介意中间是否有4到5个项目/心跳。不需要那么精确。
但是为什么在d之后还有一项被省略了呢?有办法解决吗?也许我用错了手术。
我想我可以用一个处理器来实现它,但是文档上说
大多数时候,你应该尽量避免使用处理器。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题