简单循环javaReact器post平面图

iswrvxsc  于 2023-01-16  发布在  Java
关注(0)|答案(2)|浏览(127)

我的问题是,我做了一个post请求,以获得我的数据库中元素的总数,我需要做一个for循环,直到我达到整数除10的数字。
我当前无法工作的代码

protected Mono<List<Long>> getAllSubscriptionIds(ProductCode productCode) {
    List<Long> subscriptionIds = new ArrayList<>();

    String body = "{\n" +
            " \"productCodes\": [\"" + productCode.name() + "\"],\n" +
            " \"pagination\": {\n" +
            "     \"offset\": 0,\n" +
            "     \"limit\": 10" +
            "\n  }\n" +
            " }";
    //first post where I get the number of elements in my db
    return restClient.post(
                    "https://" + url,
                    buildRequiredHeaders(),
                    body,
                    String.class
            )
            .onErrorMap(err-> new RuntimeException(err.getMessage()))
            .flatMap(response -> {
                log.debug(response);
                ResponseModel<DataLakeCallResponse<JsonNode>> variable = null;
                try {
                    variable = JsonUtil.fromString(response, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
                    });
                    log.debug(response);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
                variable.getPayload().getList().forEach(
                        object-> subscriptionIds.add(object.get("subscriptionId").asLong()));
                //if number of elements > 10
                if(variable.getPayload().getPagination().getResultCount() > 10){
                    //for loop on that number / 10 (so that I can use an offset
                    for (int i = 0; i < variable.getPayload().getPagination().getResultCount() / 10; i++){
                        String bodyI = "{\n" +
                                " \"productCodes\": [\"" + productCode.name() + "\"],\n" +
                                " \"pagination\": {\n" +
                                "     \"offset\": " + (i + 1) * 10 + ",\n" +
                                "     \"limit\": 10\n" +
                                "  }\n" +
                                " }";
                        return restClient.post(
                                        "https://" + url,
                                        buildRequiredHeaders(),
                                        bodyI,
                                        String.class
                                )
                                .onErrorMap(err-> new RuntimeException(err.getMessage()))
                                .flatMap(resp -> {
                                    ResponseModel<DataLakeCallResponse<JsonNode>> varia = null;
                                    try {
                                        varia = JsonUtil.fromString(resp, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
                                        });
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                    varia.getPayload().getList().forEach(
                                            object-> subscriptionIds.add(object.get("subscriptionId").asLong()));

                                    return Mono.just(subscriptionIds);
                                });
                    }
                }
                return Mono.just(subscriptionIds);
            });
}

我确实理解为什么它不起作用(它在for循环中返回),但我真的不明白我能用什么替代方法来使它起作用。我尝试了一个外部方法,但它仍然失败。我尝试了Mono.zip,但我认为我尝试错了。
这是我尝试过的替代方案,但仍然不起作用。

protected Mono<Object> getAllSubscriptionIds(ProductCode productCode) {
this.counter = 0;
List<Long> subscriptionIds = new ArrayList<>();
List<Mono<Integer>> resultList = new ArrayList<>();

String body = "{\n" +
        " \"productCodes\": [\"" + productCode.name() + "\"],\n" +
        " \"pagination\": {\n" +
        "     \"offset\": 0,\n" +
        "     \"limit\": 10" +
        "\n  }\n" +
        " }";

return restClient.post(
                "https://" + url,
                buildRequiredHeaders(),
                body,
                String.class
        )
        .onErrorMap(err-> new RuntimeException(err.getMessage()))
        .flatMap(response -> {
            log.debug(response);
            ResponseModel<DataLakeCallResponse<JsonNode>> variable = null;
            try {
                variable = JsonUtil.fromString(response, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
                });
                log.debug(response);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            variable.getPayload().getList().forEach(
                    object-> subscriptionIds.add(object.get("subscriptionId").asLong()));

            if(variable.getPayload().getPagination().getResultCount() > 10){
                for (int i = 0; i < variable.getPayload().getPagination().getResultCount() / 10; i++){
                    resultList.add(Mono.just(i));
                }
            }

            return Mono.zip(resultList, intMono -> {
                this.counter++;
                String bodyI = "{\n" +
                        " \"productCodes\": [\"" + productCode.name() + "\"],\n" +
                        " \"pagination\": {\n" +
                        "     \"offset\": " + this.counter * 10 + ",\n" +
                        "     \"limit\": 10\n" +
                        "  }\n" +
                        " }";
                return restClient.post(
                                "https://" + url,
                                buildRequiredHeaders(),
                                bodyI,
                                String.class
                        )
                        .onErrorMap(err-> new RuntimeException(err.getMessage()))
                        .flatMap(resp -> {
                            ResponseModel<DataLakeCallResponse<JsonNode>> varia = null;
                            try {
                                varia = JsonUtil.fromString(resp, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
                                });
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                            varia.getPayload().getList().forEach(
                                    object-> subscriptionIds.add(object.get("subscriptionId").asLong()));

                            return Mono.just(subscriptionIds);
                        });
            });
           // return Mono.just(subscriptionIds);
        });
}

你知道怎么解决吗?

s4chpxco

s4chpxco1#

代码的问题是在for循环内部返回,这会导致函数在循环的第一次迭代后立即返回。2您可以使用flatMap操作符来保持管道运行,并将每次迭代的结果添加到subscriptionIds中,而不是返回

2fjabf4q

2fjabf4q2#

好吧我终于有办法了

protected Flux<Object> getAllSubscriptionIds(ProductCode productCode) {
    List<Long> subscriptionIds = new ArrayList<>();
    AtomicInteger i = new AtomicInteger();

    String body = "{\n" +
            " \"productCodes\": [\"" + productCode.name() + "\"],\n" +
            " \"pagination\": {\n" +
            "     \"offset\": 0,\n" +
            "     \"limit\": 1000" +
            "\n  }\n" +
            " }";

    return restClient.post(
                    "https://" + url,
                    buildRequiredHeaders(),
                    body,
                    String.class
            )
            .onErrorMap(err-> new RuntimeException(err.getMessage()))
            .flatMapMany(response -> {
                log.debug(response);
                ResponseModel<DataLakeCallResponse<JsonNode>> variable = null;
                try {
                    variable = JsonUtil.fromString(response, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
                    });
                    log.debug(response);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
                variable.getPayload().getList().forEach(
                        object-> subscriptionIds.add(object.get("subscriptionId").asLong()));

                if(variable.getPayload().getPagination().getResultCount() > 1000){
                        String bodyI = "{\n" +
                                " \"productCodes\": [\"" + productCode.name() + "\"],\n" +
                                " \"pagination\": {\n" +
                                "     \"offset\": " + i.incrementAndGet() * 1000 + ",\n" +
                                "     \"limit\": 1000\n" +
                                "  }\n" +
                                " }";
                        return restClient.post(
                                        "https://" + url,
                                        buildRequiredHeaders(),
                                        bodyI,
                                        String.class
                                )
                                .onErrorMap(err-> new RuntimeException(err.getMessage()))
                                .flatMap(resp -> {
                                    return restClient.post(
                                                    "https://" + url,
                                                    buildRequiredHeaders(),
                                                    "{\n" +
                                                            " \"productCodes\": [\"" + productCode.name() + "\"],\n" +
                                                            " \"pagination\": {\n" +
                                                            "     \"offset\": " + i.incrementAndGet() * 1000 + ",\n" +
                                                            "     \"limit\": 1000\n" +
                                                            "  }\n" +
                                                            " }",
                                                    String.class
                                            )
                                            .onErrorMap(err-> new RuntimeException(err.getMessage()))
                                            .flatMap(respI -> {
                                                ResponseModel<DataLakeCallResponse<JsonNode>> varia = null;
                                                try {
                                                    varia = JsonUtil.fromString(respI, new TypeReference<ResponseModel<DataLakeCallResponse<JsonNode>>>() {
                                                    });
                                                } catch (IOException e) {
                                                    throw new RuntimeException(e);
                                                }
                                                varia.getPayload().getList().forEach(
                                                        object-> subscriptionIds.add(object.get("subscriptionId").asLong()));

                                                return Mono.just(subscriptionIds);
                                            });
                                }).repeat(variable.getPayload().getPagination().getResultCount() / 1000);
                }
                return Mono.just(subscriptionIds);
            });
}

基本上,我将第一个flatMap更改为flatMapMany,以便在其中可以有一个带repeat循环的flatMap。我必须返回一个Flux而不是原来的Mono,但由于我知道无论如何它总是会导致Mono,所以我将原来的调用者更改为

return getAllSubscriptionIds(request.getEventMetadata().getProductCode()).collect(Collectors.reducing((i1, i2) -> i1)).flatMap(responseIds -> {
        List<BillableApiCall> queryResults = dataLakeMapper.getBillableCallsApiCheckIban(
                ((ArrayList<Long>)responseIds.get()),
                DateUtil.toLocalDateEuropeRome(request.getFromDate()),
                DateUtil.toLocalDateEuropeRome(request.getToDate()),
                request.getPagination()
        );

所以我不得不添加.collect(Collectors.reducing((i1,i2)-〉i1))(我复制/粘贴了它,所以我只能猜测它的功能...它将通量转换为单声道),并使用((ArrayList)responseIds.get())投射我的响应ID。
repeat不是最终解决方案,因为它只重复flatMap内部的内容(它不会重复连接到它的帖子)所以我不得不使用一个技巧...我删除了不必要的for循环,我在我的flatMap中用另一个flatMap重复了一个帖子...唯一缺少的东西是跟踪我的索引,我能够发现你可以使用AtomicInteger来做这件事。这并不是一件容易的事,但我测试了它,它的工作。

  1. flatMapMany,其中包含一个repeat flatMap(repeat只需要一个long作为参数,所以它会一直重复,直到达到那个值并自动递增......但是根据我的理解,您不能使用这个索引)。
  2. flatMap repeat内的另一个flatMap,这是因为如果没有此解决方案,您无法执行另一个post调用,因为repeat将仅重复flatMap内的内容(不是之前的post调用,但可以在其中执行post调用)。
    1.一个AtomicInteger作为索引。
    1.改变返回类型为通量,收集和铸造。
    希望有人能从我的头痛中受益。

相关问题