reactor.core.publisher.Flux.reduce()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.7k)|赞(0)|评价(0)|浏览(612)

本文整理了Java中reactor.core.publisher.Flux.reduce()方法的一些代码示例,展示了Flux.reduce()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.reduce()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:reduce

Flux.reduce介绍

[英]Reduce the values from this Flux sequence into a single object matching the type of a seed value. Reduction is performed using a BiFunction that takes the intermediate result of the reduction and the current value and returns the next intermediate value of the reduction. First element is paired with the seed value, initial.
[中]将此通量序列中的值减少为与种子值类型匹配的单个对象。使用双函数执行缩减,该函数获取缩减的中间结果和当前值,并返回缩减的下一个中间值。第一个元素与种子值initial配对。

代码示例

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Mono<Long> del(Iterable<K> keys) {
  Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);
  if (partitioned.size() < 2) {
    return super.del(keys);
  }
  List<Publisher<Long>> publishers = new ArrayList<>();
  for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
    publishers.add(super.del(entry.getValue()));
  }
  return Flux.merge(publishers).reduce((accu, next) -> accu + next);
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Mono<Long> unlink(Iterable<K> keys) {
  Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);
  if (partitioned.size() < 2) {
    return super.unlink(keys);
  }
  List<Publisher<Long>> publishers = new ArrayList<>();
  for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
    publishers.add(super.unlink(entry.getValue()));
  }
  return Flux.merge(publishers).reduce((accu, next) -> accu + next);
}

代码示例来源:origin: spring-projects/spring-framework

private Mono<? extends Resource> transform(String content, Resource resource,
    ResourceTransformerChain chain, ServerWebExchange exchange) {
  if (!content.startsWith(MANIFEST_HEADER)) {
    if (logger.isTraceEnabled()) {
      logger.trace(exchange.getLogPrefix() +
          "Skipping " + resource + ": Manifest does not start with 'CACHE MANIFEST'");
    }
    return Mono.just(resource);
  }
  return Flux.generate(new LineInfoGenerator(content))
      .concatMap(info -> processLine(info, exchange, resource, chain))
      .reduce(new ByteArrayOutputStream(), (out, line) -> {
        writeToByteArrayOutputStream(out, line + "\n");
        return out;
      })
      .map(out -> {
        String hash = DigestUtils.md5DigestAsHex(out.toByteArray());
        writeToByteArrayOutputStream(out, "\n" + "# Hash: " + hash);
        return new TransformedResource(resource, out.toByteArray());
      });
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Aggregate response data and convert to a String using the "Content-Type"
 * charset or "UTF-8" by default.
 */
public Mono<String> getBodyAsString() {
  Charset charset = Optional.ofNullable(getHeaders().getContentType()).map(MimeType::getCharset)
      .orElse(StandardCharsets.UTF_8);
  return getBody()
      .reduce(bufferFactory().allocateBuffer(), (previous, current) -> {
        previous.write(current);
        DataBufferUtils.release(current);
        return previous;
      })
      .map(buffer -> bufferToString(buffer, charset));
}

代码示例来源:origin: lettuce-io/lettuce-core

public Mono<Long> touch(Iterable<K> keys) {
  List<K> keyList = LettuceLists.newList(keys);
  Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keyList);
  if (partitioned.size() < 2) {
    return super.touch(keyList);
  }
  List<Publisher<Long>> publishers = new ArrayList<>();
  for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
    publishers.add(super.touch(entry.getValue()));
  }
  return Flux.merge(publishers).reduce((accu, next) -> accu + next);
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Aggregate response data and convert to a String using the "Content-Type"
 * charset or "UTF-8" by default.
 */
public Mono<String> getBodyAsString() {
  Charset charset = Optional.ofNullable(getHeaders().getContentType()).map(MimeType::getCharset)
      .orElse(StandardCharsets.UTF_8);
  return getBody()
      .reduce(bufferFactory().allocateBuffer(), (previous, current) -> {
        previous.write(current);
        DataBufferUtils.release(current);
        return previous;
      })
      .map(buffer -> bufferToString(buffer, charset));
}

代码示例来源:origin: spring-projects/spring-framework

private Mono<? extends Resource> transformContent(String cssContent, Resource resource,
    ResourceTransformerChain chain, ServerWebExchange exchange) {
  List<ContentChunkInfo> contentChunkInfos = parseContent(cssContent);
  if (contentChunkInfos.isEmpty()) {
    return Mono.just(resource);
  }
  return Flux.fromIterable(contentChunkInfos)
      .concatMap(contentChunkInfo -> {
        String contentChunk = contentChunkInfo.getContent(cssContent);
        if (contentChunkInfo.isLink() && !hasScheme(contentChunk)) {
          String link = toAbsolutePath(contentChunk, exchange);
          return resolveUrlPath(link, exchange, resource, chain).defaultIfEmpty(contentChunk);
        }
        else {
          return Mono.just(contentChunk);
        }
      })
      .reduce(new StringWriter(), (writer, chunk) -> {
        writer.write(chunk);
        return writer;
      })
      .map(writer -> {
        byte[] newContent = writer.toString().getBytes(DEFAULT_CHARSET);
        return new TransformedResource(resource, newContent);
      });
}

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
  return Arrays.asList(
      scenario(f -> f.reduce((a, b) -> a))
  );
}

代码示例来源:origin: lettuce-io/lettuce-core

@Override
public Mono<Boolean> msetnx(Map<K, V> map) {
  return pipeliningWithMap(map, kvMap -> RedisAdvancedClusterReactiveCommandsImpl.super.msetnx(kvMap).flux(),
      booleanFlux -> booleanFlux).reduce((accu, next) -> accu && next);
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Return the response body aggregated and converted to a String using the
 * charset of the Content-Type response or otherwise as "UTF-8".
 */
public Mono<String> getBodyAsString() {
  Charset charset = getCharset();
  return Flux.from(getBody())
      .reduce(this.bufferFactory.allocateBuffer(), (previous, current) -> {
        previous.write(current);
        DataBufferUtils.release(current);
        return previous;
      })
      .map(buffer -> dumpString(buffer, charset));
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Aggregate response data and convert to a String using the "Content-Type"
 * charset or "UTF-8" by default.
 */
public Mono<String> getBodyAsString() {
  Charset charset = Optional.ofNullable(getHeaders().getContentType()).map(MimeType::getCharset)
      .orElse(StandardCharsets.UTF_8);
  return getBody()
      .reduce(bufferFactory().allocateBuffer(), (previous, current) -> {
        previous.write(current);
        DataBufferUtils.release(current);
        return previous;
      })
      .map(buffer -> bufferToString(buffer, charset));
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Apply {@link Flux#reduce(Object, BiFunction) reduce} on the body, count
 * the number of bytes produced, release data buffers without writing, and
 * set the {@literal Content-Length} header.
 */
@Override
public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
  return Flux.from(body)
      .reduce(0, (current, buffer) -> {
        int next = current + buffer.readableByteCount();
        DataBufferUtils.release(buffer);
        return next;
      })
      .doOnNext(count -> getHeaders().setContentLength(count))
      .then();
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Aggregate response data and convert to a String using the "Content-Type"
 * charset or "UTF-8" by default.
 */
public Mono<String> getBodyAsString() {
  Charset charset = Optional.ofNullable(getHeaders().getContentType()).map(MimeType::getCharset)
      .orElse(StandardCharsets.UTF_8);
  return getBody()
      .reduce(bufferFactory().allocateBuffer(), (previous, current) -> {
        previous.write(current);
        DataBufferUtils.release(current);
        return previous;
      })
      .map(buffer -> bufferToString(buffer, charset));
}

代码示例来源:origin: spring-projects/spring-framework

/**
 * Return the response body aggregated and converted to a String using the
 * charset of the Content-Type response or otherwise as "UTF-8".
 */
public Mono<String> getBodyAsString() {
  Charset charset = getCharset();
  return Flux.from(getBody())
      .reduce(bufferFactory.allocateBuffer(), (previous, current) -> {
        previous.write(current);
        DataBufferUtils.release(current);
        return previous;
      })
      .map(buffer -> dumpString(buffer, charset));
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void writeAndFlushWith() {
  Mono<String> result = this.webClient.get()
      .uri("/write-and-flush")
      .retrieve()
      .bodyToFlux(String.class)
      .takeUntil(s -> s.endsWith("data1"))
      .reduce((s1, s2) -> s1 + s2);
  StepVerifier.create(result)
      .expectNext("data0data1")
      .expectComplete()
      .verify(Duration.ofSeconds(10L));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void error() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>error(new RuntimeException("forced failure")).reduce((a, b) -> a + b)
                                .subscribe(ts);
  ts.assertNoValues()
   .assertError(RuntimeException.class)
   .assertErrorWith(e -> Assert.assertTrue(e.getMessage()
                        .contains("forced failure")))
   .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

Mono<Long> scenario_fluxItemCanBeShiftedByTime() {
  return Flux.range(0, 10000)
        .delayElements(Duration.ofMillis(150))
        .elapsed()
        .take(10)
        .reduce(0L,
            (acc, next) -> acc > 0l ? ((next.getT1() + acc) / 2) :
                next.getT1());
}

代码示例来源:origin: reactor/reactor-core

Mono<Long> scenario_fluxItemCanBeShiftedByTime2() {
  return Flux.range(0, 10000)
        .delayElements(Duration.ofMillis(150))
        .elapsed()
        .take(10)
        .reduce(0L,
            (acc, next) -> acc > 0l ? ((next.getT1() + acc) / 2) :
                next.getT1());
}

代码示例来源:origin: spring-projects/spring-framework

@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
  Mono<Integer> requestSizeMono = request.getBody().
      reduce(0, (integer, dataBuffer) -> integer +
          dataBuffer.readableByteCount()).
      doOnSuccessOrError((size, throwable) -> {
        assertNull(throwable);
        assertEquals(REQUEST_SIZE, (long) size);
      });
  response.getHeaders().setContentLength(RESPONSE_SIZE);
  return requestSizeMono.then(response.writeWith(multipleChunks()));
}

代码示例来源:origin: spring-projects/spring-framework

.reduce(this.bufferFactory.allocateBuffer(), (previous, current) -> {
  previous.write(current);
  DataBufferUtils.release(current);

相关文章

Flux类方法