reactor.core.publisher.Flux.generate()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(8.1k)|赞(0)|评价(0)|浏览(459)

本文整理了Java中reactor.core.publisher.Flux.generate()方法的一些代码示例,展示了Flux.generate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.generate()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:generate

Flux.generate介绍

[英]Programmatically create a Flux by generating signals one-by-one via a consumer callback and some state. The stateSupplier may return null.
[中]通过一个消费者回调和一些状态,通过编程一个接一个地生成信号,从而创建流量。stateprovider可以返回null。

代码示例

代码示例来源:origin: spring-projects/spring-framework

/**
 * Obtain a {@link ReadableByteChannel} from the given supplier, and read it into a
 * {@code Flux} of {@code DataBuffer}s. Closes the channel when the flux is terminated.
 * @param channelSupplier the supplier for the channel to read from
 * @param dataBufferFactory the factory to create data buffers with
 * @param bufferSize the maximum size of the data buffers
 * @return a flux of data buffers read from the given channel
 */
public static Flux<DataBuffer> readByteChannel(
    Callable<ReadableByteChannel> channelSupplier, DataBufferFactory dataBufferFactory, int bufferSize) {
  Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
  Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
  Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
  return Flux.using(channelSupplier,
      channel -> {
        ReadableByteChannelGenerator generator =
            new ReadableByteChannelGenerator(channel, dataBufferFactory,
                bufferSize);
        return Flux.generate(generator);
      },
      DataBufferUtils::closeChannel)
      .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
}

代码示例来源:origin: spring-projects/spring-framework

private Flux<String> chunks1K() {
  return Flux.generate(sink -> {
    StringBuilder sb = new StringBuilder();
    do {
      for (char c : "0123456789".toCharArray()) {
        sb.append(c);
        if (sb.length() + 1 == 1024) {
          sink.next(sb.append("\n").toString());
          return;
        }
      }
    } while (true);
  });
}

代码示例来源:origin: spring-projects/spring-framework

private Mono<? extends Resource> transform(String content, Resource resource,
    ResourceTransformerChain chain, ServerWebExchange exchange) {
  if (!content.startsWith(MANIFEST_HEADER)) {
    if (logger.isTraceEnabled()) {
      logger.trace(exchange.getLogPrefix() +
          "Skipping " + resource + ": Manifest does not start with 'CACHE MANIFEST'");
    }
    return Mono.just(resource);
  }
  return Flux.generate(new LineInfoGenerator(content))
      .concatMap(info -> processLine(info, exchange, resource, chain))
      .reduce(new ByteArrayOutputStream(), (out, line) -> {
        writeToByteArrayOutputStream(out, line + "\n");
        return out;
      })
      .map(out -> {
        String hash = DigestUtils.md5DigestAsHex(out.toByteArray());
        writeToByteArrayOutputStream(out, "\n" + "# Hash: " + hash);
        return new TransformedResource(resource, out.toByteArray());
      });
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void stateSupplierNull() {
  Flux.generate(null, (s, o) -> s, s -> {
  });
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void generatorNull() {
  Flux.generate(() -> 1, null, s -> {
  });
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = NullPointerException.class)
public void stateConsumerNull() {
  Flux.generate(() -> 1, (s, o) -> s, null);
}

代码示例来源:origin: reactor/reactor-core

@Override
public Publisher<Long> createPublisher(long elements) {
  return Flux.<Long, Long>generate(() -> 0L, (cursor, s) -> {
    if(cursor < elements && cursor < elements) {
      s.next(cursor);
    }
    else if(cursor == elements){
      s.complete();
    }
    return cursor + 1;
  })
      .map(data -> data * 10)
      .map( data -> data / 10)
      .log("log-test", Level.FINE);
}

代码示例来源:origin: spring-projects/spring-framework

@Test
public void errorAfterMultipleItems() throws Exception {
  IllegalStateException error = new IllegalStateException("boo");
  Flux<String> publisher = Flux.generate(() -> 0, (idx , subscriber) -> {
    int i = ++idx;
    subscriber.next(String.valueOf(i));
    if (i == 3) {
      subscriber.error(error);
    }
    return i;
  });
  Mono<Void> completion = publisher.as(this::sendOperator);
  Signal<Void> signal = completion.materialize().block();
  assertNotNull(signal);
  assertSame("Unexpected signal: " + signal, error, signal.getThrowable());
  assertEquals(3, this.writer.items.size());
  assertEquals("1", this.writer.items.get(0));
  assertEquals("2", this.writer.items.get(1));
  assertEquals("3", this.writer.items.get(2));
  assertSame(error, this.writer.error);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void generatorMultipleOnErrors() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>generate(o -> {
    o.error(new RuntimeException("forced failure"));
    o.error(new RuntimeException("forced failure"));
  }).subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(RuntimeException.class)
   .assertErrorMessage("forced failure");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void stateConsumerCalled() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  AtomicInteger stateConsumer = new AtomicInteger();
  Flux.<Integer, Integer>generate(() -> 1, (s, o) -> {
    o.complete();
    return s;
  }, stateConsumer::set).subscribe(ts);
  ts.assertNoValues()
   .assertComplete()
   .assertNoError();
  Assert.assertEquals(1, stateConsumer.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void generatorThrows() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>generate(o -> {
    throw new RuntimeException("forced failure");
  }).subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(RuntimeException.class)
   .assertErrorMessage("forced failure");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void contextTest() {
  StepVerifier.create(Flux.generate(s -> s.next(s.currentContext()
                          .get(AtomicInteger.class)
                          .incrementAndGet()))
              .take(10)
              .subscriberContext(ctx -> ctx.put(AtomicInteger.class,
                  new AtomicInteger())))
        .expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .verifyComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void stateSupplierThrows() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer, Integer>generate(() -> {
    throw new RuntimeException("forced failure");
  }, (s, o) -> {
    o.next(1);
    return s;
  }).subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(RuntimeException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void generateError() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>generate(o -> {
    o.error(new RuntimeException("forced failure"));
  }).subscribe(ts);
  ts.assertNoValues()
   .assertNotComplete()
   .assertError(RuntimeException.class)
   .assertErrorMessage("forced failure");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void generateEmpty() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>generate(o -> {
    o.complete();
  }).subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void generatorMultipleOnCompletes() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>generate(o -> {
    o.complete();
    o.complete();
  }).subscribe(ts);
  ts.assertNoValues()
   .assertComplete()
   .assertNoError();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void generatorMultipleOnNexts() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>generate(o -> {
    o.next(1);
    o.next(1);
  }).subscribe(ts);
  ts.assertValues(1)
   .assertNotComplete()
   .assertError(IllegalStateException.class);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void generateJust() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer>generate(o -> {
    o.next(1);
    o.complete();
  }).subscribe(ts);
  ts.assertValues(1)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void generateRange() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  Flux.<Integer, Integer>generate(() -> 1, (s, o) -> {
    if (s < 11) {
      o.next(s);
    }
    else {
      o.complete();
    }
    return s + 1;
  }).subscribe(ts);
  ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void generateJustBackpressured() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create(0);
  Flux.<Integer>generate(o -> {
    o.next(1);
    o.complete();
  }).subscribe(ts);
  ts.assertNoValues()
   .assertNoError()
   .assertNotComplete();
  ts.request(2);
  ts.assertValues(1)
   .assertNoError()
   .assertComplete();
}

相关文章

Flux类方法