我将请求发送到Web服务,将结果转换为一个大的 csv
并将csv行保存到数据库中。
由于请求是长时间运行的(10-20秒),我想并行化请求。我一次收集了所有的数据 StringBuilder
保存转换的csv行的。
问题:如果在csv中达到了1000行的数据块,那么如何将数据取出进行持久化,而任何其他并发响应都将写入新的 StringBuilder
?
因为,流的最终变量无法重新初始化。
final StringBuilder sb = new StringBuilder();
AtomicInteger count = new AtomicInteger();
Flux.fromIterable(requests)
.flatMap(req -> {
return webClientService.send(req); //assume long running response
}, 8) //send 8 requests in parallel, as response takes up to 10s
.map(rsp -> {
//convert response to csv values and add to StringBuilder
int c = addCsv(sb, rsp);
if (count.addAndGet(c) > 1000) {
//TODO how can I assign a new StringBuilder,
//so that all further finished responses will append the csv to the new builder?
//same problem with the counter.
databaseWriter.write(sb.build()); //writes the content so far to db, but not threadsafe so far
}
return c;
})
.blockLast();
2条答案
按热度按时间h5qlskok1#
或许你可以尝试完全避免副作用,例如:
nxagd54h2#
在我看来,您可以使用内置运算符来实现相同的结果: