java—如何使用SpringWebFlux构建React式内存存储库?

sz81bmfz  于 2021-06-27  发布在  Java
关注(0)|答案(2)|浏览(381)

我正在尝试实现一个React式内存存储库。这应该如何实现?
这是我要做的一个阻止版本

@Repository
@AllArgsConstructor
public class InMemEventRepository implements EventRepository {

    private final List<Event> events;

    @Override
    public void save(final Mono<Event> event) {
        events.add(event.block());
        // event.subscribe(events::add); <- does not do anything
    }

    @Override
    public Flux<Event> findAll() {
        return Flux.fromIterable(events);
    }

}

我试过用 event.subscribe(events::add); 但是这个事件没有被添加到列表中(也许我遗漏了什么?)
也许 events 应为类型 Flux<Event> 还有一些方法可以添加 Mono<Event>Flux<Event> ?

iq0todco

iq0todco1#

为此我建议用Flume。

public static class InMemEventRepository {
    private final Scheduler serializerScheduler = Schedulers.single();

    private final Sinks.Many<Event> events = Sinks.many().replay().all();

    public void save(Mono<Event> event) {
      event
          .publishOn(serializerScheduler) // If event will be published on multiple threads you need to serialize them
          .subscribe(x -> events.emitNext(x, EmitFailureHandler.FAIL_FAST)); 
    }

    public Flux<Event> findAll() {
      return events.asFlux();
    }
  }

这是3.4号React堆。对于较旧的版本,您可以使用处理器,但它们现在已被弃用。汇通常更易于使用,但它们不会序列化来自多个线程的发射。这就是为什么我使用调度程序。
另请参阅此答案,以了解序列化汇排放的替代方法

shstlldc

shstlldc2#

如果你选择 Flux.fromIterable ,您将只获得以前事件的订阅,但您将失去未来事件的订阅
我以前做过一次poc,想得到类似的效果,你可以检查一下https://github.com/albertosh/keepmeupdated
其主要思想是要有一个中心点,在该点上事件发生并订阅存储库。无论何时订阅 findAll ,你会得到无限的 List<Item> . 任何保存的项目都将触发一个新事件,并且任何人都将订阅该事件 findAll 我会明白的
请注意,此repo使用的是rxjava,因此可能需要一些端口到reactor

相关问题