reactor.core.publisher.Flux.toIterable()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.4k)|赞(0)|评价(0)|浏览(232)

本文整理了Java中reactor.core.publisher.Flux.toIterable()方法的一些代码示例,展示了Flux.toIterable()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.toIterable()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:toIterable

Flux.toIterable介绍

[英]Transform this Flux into a lazy Iterable blocking on Iterator#next() calls.

Note that iterating from within threads marked as "non-blocking only" is illegal and will cause an IllegalStateException to be thrown, but obtaining the Iterableitself within these threads is ok.
[中]在迭代器#next()调用中将此流量转换为惰性Iterable阻塞。
请注意,从标记为“仅非阻塞”的线程内进行迭代是非法的,并将导致引发IllegalStateException,但在这些线程内获取Iterableitself是可以的。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Transform this {@link Flux} into a lazy {@link Iterable} blocking on
 * {@link Iterator#next()} calls.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/toIterable.svg" alt="">
 * <p>
 * Note that iterating from within threads marked as "non-blocking only" is illegal and will
 * cause an {@link IllegalStateException} to be thrown, but obtaining the {@link Iterable}
 * itself within these threads is ok.
 *
 * @return a blocking {@link Iterable}
 */
public final Iterable<T> toIterable() {
  return toIterable(Queues.SMALL_BUFFER_SIZE);
}

代码示例来源:origin: reactor/reactor-core

/**
 * Transform this {@link Flux} into a lazy {@link Iterable} blocking on
 * {@link Iterator#next()} calls.
 * <p>
 * <img class="marble" src="doc-files/marbles/toIterableWithBatchSize.svg" alt="">
 * <p>
 * Note that iterating from within threads marked as "non-blocking only" is illegal and will
 * cause an {@link IllegalStateException} to be thrown, but obtaining the {@link Iterable}
 * itself within these threads is ok.
 *
 * @param batchSize the bounded capacity to prefetch from this {@link Flux} or
 * {@code Integer.MAX_VALUE} for unbounded demand
 * @return a blocking {@link Iterable}
 */
public final Iterable<T> toIterable(int batchSize) {
  return toIterable(batchSize, null);
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5000, expected = RuntimeException.class)
public void error() {
  List<Integer> values = new ArrayList<>();
  for (Integer i : Flux.<Integer>error(new RuntimeException("forced failure")).toIterable()) {
    values.add(i);
  }
  Assert.assertEquals(Collections.emptyList(), values);
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5000)
public void normal2() {
  Queue<Integer> q = new ArrayBlockingQueue<>(1);
  List<Integer> values = new ArrayList<>();
  for (Integer i : Flux.range(1, 10)
             .toIterable(1, () -> q)) {
    values.add(i);
  }
  Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), values);
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5000)
public void empty() {
  List<Integer> values = new ArrayList<>();
  for (Integer i : FluxEmpty.<Integer>instance().toIterable()) {
    values.add(i);
  }
  Assert.assertEquals(Collections.emptyList(), values);
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void readQueuesFromPublishers() {
//        "Read Queues from Publishers"
//        given: "Iterable publisher of 1000 to read queue"
    List<Integer> thousand = new ArrayList<>(1000);
    for (int i = 1; i <= 1000; i++) {
      thousand.add(i);
    }
    Flux<Integer> pub = Flux.fromIterable(thousand);
    Iterator<Integer> queue = pub.toIterable()
                   .iterator();

//        when: "read the queue"
    Integer v = queue.next();
    Integer v2 = queue.next();
    for (int i = 0; i < 997; i++) {
      queue.next();
    }

    Integer v3 = queue.next();

//        then: "queues values correct"
    assertThat(v).isEqualTo(1);
    assertThat(v2).isEqualTo(2);
    assertThat(v3).isEqualTo(1000);
  }

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5000)
public void normal() {
  List<Integer> values = new ArrayList<>();
  for (Integer i : Flux.range(1, 10)
             .toIterable()) {
    values.add(i);
  }
  Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), values);
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void fluxCanBeTranslatedToCompletableQueue() {
//        "A deferred Flux can be translated into a completable queue"
//        given:    "a composable with an initial value"
    Flux<String> stream = Flux.just("test", "test2", "test3")
              .log()
              .publishOn(Schedulers.parallel());

//        when: "the flux is retrieved"
    stream = stream.map(it -> it + "-ok")
            .log();

    Iterator<String> queue = stream.toIterable()
                    .iterator();

    List<String> result = new ArrayList<>();

    while (queue.hasNext()) {
      result.add(queue.next());
    }

//        then:"it is available"
    assertThat(result).containsExactly("test-ok", "test2-ok", "test3-ok");
  }

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 1000)
public void gh841_iteratorFromCreate() {
  Iterator<String> it = Flux.<String>create(sink -> {
    sink.next("a");
    sink.next("b");
    sink.complete();
  }).sort((a, b) -> {
    throw new IllegalStateException("boom");
  }).toIterable().iterator();
  assertThatExceptionOfType(IllegalStateException.class)
      .isThrownBy(it::hasNext)
      .withMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

try {
  ref.set(Flux.just(1, 2, 3)
        .toIterable());

代码示例来源:origin: reactor/reactor-core

/**
 * See https://github.com/reactor/reactor-core/issues/508
 */
@Test
public void testPublishingTwice() {
  StepVerifier.create(Flux.just(Flux.range(0, 300).toIterable(), Flux.range(0, 300).toIterable())
      .flatMapIterable(x -> x)
      .share()
      .share()
      .count())
      .expectNext(600L)
      .verifyComplete();
}

代码示例来源:origin: spring-projects/spring-integration

iterator = Flux.from((Publisher<Object>) result).toIterable().iterator();

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param batchSize
 * @return
 * @see reactor.core.publisher.Flux#toIterable(long)
 */
public final Iterable<T> toIterable(long batchSize) {
  return boxed.toIterable(batchSize);
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @return
 * @see reactor.core.publisher.Flux#toIterable()
 */
public final Iterable<T> toIterable() {
  return boxed.toIterable();
}
/**

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param batchSize
 * @param queueProvider
 * @return
 * @see reactor.core.publisher.Flux#toIterable(long, java.util.function.Supplier)
 */
public final Iterable<T> toIterable(long batchSize, Supplier<Queue<T>> queueProvider) {
  return boxed.toIterable(batchSize, queueProvider);
}
/**

代码示例来源:origin: reactor/lite-rx-api-hands-on

@Override
public Iterable<User> findAll() {
  callCount++;
  return reactiveRepository.findAll().toIterable();
}

代码示例来源:origin: spring-cloud/spring-cloud-function

private Object result(Object input, Publisher<?> output) {
  List<Object> result = new ArrayList<>();
  for (Object value : Flux.from(output).toIterable()) {
    result.add(value);
  }
  if (isSingleValue(input) && result.size() == 1) {
    return result.get(0);
  }
  return result;
}

代码示例来源:origin: spring-cloud/spring-cloud-function

private Object result(Object input, Publisher<?> flux) {
  List<Object> result = new ArrayList<>();
  for (Object value : Flux.from(flux).toIterable()) {
    result.add(value);
  }
  if (isSingleValue(input) && result.size()==1) {
    return result.get(0);
  }
  return result;
}

代码示例来源:origin: spring-cloud/spring-cloud-function

private Object result(Object input, Publisher<?> output) {
  List<O> result = new ArrayList<>();
  for (Object value : Flux.from(output).toIterable()) {
    result.add(convertOutput(value));
  }
  if (isSingleValue(input) && result.size() == 1) {
    return result.get(0);
  }
  return result;
}

代码示例来源:origin: mohitsinha/tutorials

@Test
public void block() {
  String name = Mono.just("Jesse").block();
  assertEquals("Jesse", name);
  Iterator<String> namesIterator = Flux.just("Tom", "Peter").toIterable().iterator();
  assertEquals("Tom", namesIterator.next());
  assertEquals("Peter", namesIterator.next());
  assertFalse(namesIterator.hasNext());
}

相关文章

Flux类方法