如何在React流中重新分配变量?

wribegjk  于 2021-07-13  发布在  Java
关注(0)|答案(2)|浏览(311)

我将请求发送到Web服务,将结果转换为一个大的 csv 并将csv行保存到数据库中。
由于请求是长时间运行的(10-20秒),我想并行化请求。我一次收集了所有的数据 StringBuilder 保存转换的csv行的。
问题:如果在csv中达到了1000行的数据块,那么如何将数据取出进行持久化,而任何其他并发响应都将写入新的 StringBuilder ?
因为,流的最终变量无法重新初始化。

final StringBuilder sb = new StringBuilder();
AtomicInteger count = new AtomicInteger();

Flux.fromIterable(requests)
    .flatMap(req -> {
        return webClientService.send(req); //assume long running response
    }, 8) //send 8 requests in parallel, as response takes up to 10s
    .map(rsp -> {
        //convert response to csv values and add to StringBuilder
        int c = addCsv(sb, rsp);
        if (count.addAndGet(c) > 1000) {
            //TODO how can I assign a new StringBuilder,
            //so that all further finished responses will append the csv to the new builder?
            //same problem with the counter.

            databaseWriter.write(sb.build()); //writes the content so far to db, but not threadsafe so far
        }
        return c;
    })
    .blockLast();
h5qlskok

h5qlskok1#

或许你可以尝试完全避免副作用,例如:

.map(x -> toCsv(x))
.reduce((a, b) -> {
    if (length(a) < 1000) {
        return concat(a, b);
    }
    databaseWriter.write(a);
    return b;
})
.doOnNext(x -> databaseWriter.write(x))
nxagd54h

nxagd54h2#

在我看来,您可以使用内置运算符来实现相同的结果:

Flux.fromIterable(requests)
            .flatMap(req -> webClientService
                    .send(req)
                    .subscribeOn(Schedulers.boundedElastic()), 8)// subscribeOn to subscribe from different threads
            .map(resp -> converToCsvLine(resp)) //make some transformations on the respnse
            .window(1000) //split incoming data into 1000 lines
            .flatMap(stringFlux -> stringFlux.collect(Collectors.joining("\n")))// collect last 1000
            .flatMap(s -> Mono.fromRunnable(() -> writeToDb(s))) //do some logic on the collected 1000 lines
            .blockLast();

相关问题