java 订阅空Flux会导致无限循环

tjvv9vkg  于 2023-01-16  发布在  Java
关注(0)|答案(1)|浏览(254)

我有一个调用链,看起来像这样:

public ResponseEntity<Flux<Entity>> create() {
    return ResponseEntity.ok(saveList());
}

public Flux<Entity> saveList() {
    if(list.isEmpty()) return Flux.empty();
    return repository.saveAll(list);
}

无论是直接返回Flux.empty()还是将其发送给数据库调用,如果我在create()链上添加一个.map()调用,它都不会执行,因为没有元素,正如我们所期望的那样;然而,如果我添加一个.switchIfEmpty(),这个 also 也不会执行;无论我为一个空Flux尝试了什么处理程序组合,它总是导致一个无限循环。我想是因为它在等待元素的加入,这样它才能发射它。
文档建议Flux.empty()应该在不发出任何内容的情况下完成,但是记录Flux显示只调用了onSubscribe()request(),因此它没有完成,这就是为什么也没有调用. switchIfEmpty()
我需要做什么才能在订阅时完成一个空通量?
任何非零大小的Flux都可以成功,但是我尝试了许多不同的空Flux类型,比如来自数组的类型,它们都失败了。
我为switchIfEmpty()使用了一个硬编码的非空Publisher,因为这似乎意味着这是绝对必要的。

fhity93d

fhity93d1#

由于我没有收到答复 (像往常一样,我可能会补充),我已经创建了一个变通方案。
首先,我在一个列表中收集响应,然后使用Flux.error(),最后在调用者中使用onErrorComplete()完成执行。

public Flux<ListItem> create(List<ListItem> originalList) {
    return repository.get()
    .collectList()
    .flatMapMany(list -> {
        if(list.isEmpty()) return Flux.error(new IllegalStateException());
        return repository.saveAll(list);
    })
    .switchIfEmpty(repository.saveAll(originalList)
    .onErrorComplete();

由于我的flatMapMany()依赖于初始数据库调用,因此我额外使用switchIfEmpty()来捕获任何响应为空的情况,这不一定适合每个用例,但可能与可能导致Flux为空的问题类型相关。

相关问题