neo4j 如何实现wait/notify in flux,在Java中订阅

jtw3ybtb  于 2023-04-30  发布在  Java
关注(0)|答案(1)|浏览(123)

我刚开始用java函数式编程,遇到了一些困难。我正在编写一个方法来建立一个与数据库的React式会话,并将flux对象返回给调用者。然后调用者将订阅此通量并相应地获取结果。尝试模仿此示例here
我有的是

return  Flux.usingWhen(
         Mono.just(getDataStore().getRxSession()),
         session -> Flux.from(session.run(query).records()),
         RxSession::close);

然后是订阅这个通量的不同函数

Flux<Record> rflux =  query.sub();
rflux.takeUntil(//Implement wait and notifier here).subscribe(//notify here);
daupos2t

daupos2t1#

我在这里找到了答案:https://www.baeldung.com/reactor-core
我可以使用Subscription接口方法请求下一组记录。

Flux<Record> rflux =  query.sub();
rflux.subscribe(new Subscriber<Record>() {
private Subscription s;
@Override
public void onSubscribe(Subscription sub) {
    this.s = sub;
    s.request(reqParam);
}

@Override
public void onNext(Record record) {
    System.out.println("The record that is pulled is " + record.toString());
    s.request(reqParam);
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onComplete() {

}

});

相关问题