java 如何使用WebFlux从服务器流式传输数据

cnh2zyt3  于 2023-02-02  发布在  Java
关注(0)|答案(2)|浏览(251)

我的任务是简单地使控制器,给我的结果立即当他们准备好了(简单的例子如下)

想象一下

我想得到字符串的确切数量(例如1000字符串,不知何故***1秒***)(实际上我需要得到func的结果,但为了简化任务,只有字符串)
因此,当我在控制器中收到一些请求时,我希望它***在它们准备就绪时***立即给出答案(不缓冲结果),方式如下:

我想要的是:

1秒
"某个字符串"-〉(发送响应到我的前端)
1秒
"另一个"-〉(发送响应到我的前端)
1秒
"第三个"-〉(发送响应到我的前端)....

但我得到的是:

一千秒
"某个字符串"
.....
"千弦"
下面是我的代码:

@GetMapping(value = "/3", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> get3() {
        System.out.println("get3 start");
        Flux<String> result = Flux.fromStream(IntStream.range(1, 10).mapToObj(i -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "flux data--" + i;
        }));
        System.out.println("get3 end");
        return result;

    }

实际上在我的控制台里
立即执行"get3 start"和"get3 end",但仅在所有字符串准备就绪后响应
我的实际服务这个任务是类似的(但我合并2通量在这里),我得到的通量是由间隔,所以我希望它给我的结果,只要他们出现

public Flux<AnswerCalculationDto> calculate(CalculationDto calculationDto){
        String checkMsg = checkCalculationDto(calculationDto);
        if(checkMsg.equals("Success")){//valid
            Long quantity = Long.parseLong(calculationDto.getQuantity());

            Flux<AnswerCalculationDto> firstFunc =  Flux.interval(interval)//func 1
                    .onBackpressureDrop()
                    .takeWhile((i)-> i < quantity)
                    .map((i)->new AnswerCalculationDto(i,1,translateToJava(calculationDto.getFunc1(),i)))
                    ;
            Flux<AnswerCalculationDto> secondFunc = Flux.interval(interval) //func 2
                    .onBackpressureDrop()
                    .takeUntil((i)-> i > quantity-2)
                    .map((i)->new AnswerCalculationDto(i,2,translateToJava(calculationDto.getFunc2(),i)) )
                    ;
            return Flux.merge(firstFunc,secondFunc);
        }
        else {//invalid data from client
            return Flux.just(new AnswerCalculationDto("",checkMsg));
        }

    }
lbsnaicq

lbsnaicq1#

使用WebFlux从服务器流式传输数据有以下几个选项:

      • 服务器发送的事件**推送单个事件(媒体类型:x1月1x)
      • 流事件**由换行符分隔(媒体类型:(x月1日至1x日)

下面是一个完整的示例,它公开text/event-streamapplication/x-ndjson端点,并以json返回数据。如果需要纯文本内容,请使用text/event-stream

@RestController
public class StreamingController {

    @GetMapping(produces = TEXT_EVENT_STREAM_VALUE)
    Flux<DataEntry> sse() {
        return stream();
    }

    @GetMapping(produces = APPLICATION_NDJSON_VALUE)
    Flux<DataEntry> ndjson() {
        return stream();
    }

    private Flux<DataEntry> stream() {
        return Flux.range(1, 1000)
                .delayElements(Duration.ofSeconds(1))
                .map(i -> new DataEntry(i, Instant.now()));
    }

    @Value
    @Builder
    private static class DataEntry {
        long index;
        Instant timestamp;
    }
}

要测试text/event-stream,请使用:
curl -v -H "Accept: text/event-stream" http://localhost:8080

> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: text/event-stream
> 
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: text/event-stream;charset=UTF-8
< 
data:{"index":1,"timestamp":"2022-04-08T14:41:06.513352Z"}

data:{"index":2,"timestamp":"2022-04-08T14:41:07.527817Z"}

data:{"index":3,"timestamp":"2022-04-08T14:41:08.541706Z"}

data:{"index":4,"timestamp":"2022-04-08T14:41:09.553329Z"}

要测试application/x-ndjson,请使用:
curl -v -H "Accept: application/x-ndjson" http://localhost:8080

> GET / HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: application/x-ndjson
>
< HTTP/1.1 200 OK
< transfer-encoding: chunked
< Content-Type: application/x-ndjson
< 
{"index":1,"timestamp":"2022-04-08T14:42:36.081269Z"}
{"index":2,"timestamp":"2022-04-08T14:42:37.094928Z"}
{"index":3,"timestamp":"2022-04-08T14:42:38.109378Z"}
{"index":4,"timestamp":"2022-04-08T14:42:39.121315Z"}

上面的例子将以1秒的间隔生成1000条记录。

private Flux<DataEntry> stream() {
    return Flux.interval(Duration.ofSeconds(1))
            .map(i -> new DataEntry(i, Instant.now()));
}
ipakzgxi

ipakzgxi2#

我正在寻找的是Http流,此外,请注意,Safari以及 Postman 和axios(js lib -我用它在我的前端部分)不支持http流,所以你不能看到你的输出出现,只要一个结果准备好(只有所有结果在1个响应),尝试在Chrome。
另外,如果您像我一样纠结于frontend part,请尝试搜索SSE - server-sent-events,例如:https://turkogluc.com/server-sent-events-with-spring-boot-and-reactjs/
我希望这会有所帮助

相关问题