在Couchbase中阻止Upsert

mrphzbgm  于 2022-10-01  发布在  Hbase
关注(0)|答案(3)|浏览(326)

我正在编写一个测试,在实际运行任何测试之前,我需要在我的Couchbase存储桶中插入一些文档(比如10个)。所以我有一个用@BeForeAll注解的方法,它试图颠覆这些文档。现在,当我尝试运行测试时,测试失败了,因为那时文档还没有持久化。为了等待这些文档被插入,我正在做这样的事情-

Flux.fromIterable(couchDocs)
        .map(couchDoc -> bucket.upsert(couchDoc, persistTo)
        .collectList()
        .block();

但是,当我运行测试时,我仍然可以看到文档到那时还没有持久化,Assert失败。我是不是漏掉了什么?

webghufk

webghufk1#

使用.flatMap而不是.map。你的内在流没有被订阅。

hs1ihplo

hs1ihplo2#

在使用块()将项目插入到Couchbase中,然后在没有项目的情况下执行GET时,我遇到了同样的问题。似乎在块()之后到upsert有一段很短的时间,Get才能找到项目。这不是应该完成的块()的预期行为。

如果我这么做了

创建

线程。睡眠(5000)

获取项目

它工作得很好。

zpjtge22

zpjtge223#

我认为您可能需要将此添加到您的查询中

它基本上是等到Couchbase上的所有索引在内部完成后才继续查询。它修复了我的问题,即我进行了批量更新,然后尝试执行GET Items,但返回的行数不一致(只需根据已在内部完全解析的Couchbase索引的数量)。令人讨厌的症状,但这是你的荣耀之路!

QueryOptions.queryOptions().scanConsistency(QueryScanConsistency.REQUEST_PLUS)

相关问题