我正在编写一个测试,在实际运行任何测试之前,我需要在我的Couchbase存储桶中插入一些文档(比如10个)。所以我有一个用@BeForeAll注解的方法,它试图颠覆这些文档。现在,当我尝试运行测试时,测试失败了,因为那时文档还没有持久化。为了等待这些文档被插入,我正在做这样的事情-
Flux.fromIterable(couchDocs)
.map(couchDoc -> bucket.upsert(couchDoc, persistTo)
.collectList()
.block();
但是,当我运行测试时,我仍然可以看到文档到那时还没有持久化,Assert失败。我是不是漏掉了什么?
3条答案
按热度按时间webghufk1#
使用.
flatMap
而不是.map
。你的内在流没有被订阅。hs1ihplo2#
在使用块()将项目插入到Couchbase中,然后在没有项目的情况下执行GET时,我遇到了同样的问题。似乎在块()之后到upsert有一段很短的时间,Get才能找到项目。这不是应该完成的块()的预期行为。
如果我这么做了
创建
线程。睡眠(5000)
获取项目
它工作得很好。
zpjtge223#
我认为您可能需要将此添加到您的查询中
它基本上是等到Couchbase上的所有索引在内部完成后才继续查询。它修复了我的问题,即我进行了批量更新,然后尝试执行GET Items,但返回的行数不一致(只需根据已在内部完全解析的Couchbase索引的数量)。令人讨厌的症状,但这是你的荣耀之路!
QueryOptions.queryOptions().scanConsistency(QueryScanConsistency.REQUEST_PLUS)