本文整理了Java中reactor.core.publisher.Flux.toIterable()
方法的一些代码示例,展示了Flux.toIterable()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.toIterable()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:toIterable
[英]Transform this Flux into a lazy Iterable blocking on Iterator#next() calls.
Note that iterating from within threads marked as "non-blocking only" is illegal and will cause an IllegalStateException to be thrown, but obtaining the Iterableitself within these threads is ok.
[中]在迭代器#next()调用中将此流量转换为惰性Iterable阻塞。
请注意,从标记为“仅非阻塞”的线程内进行迭代是非法的,并将导致引发IllegalStateException,但在这些线程内获取Iterableitself是可以的。
代码示例来源:origin: reactor/reactor-core
/**
* Transform this {@link Flux} into a lazy {@link Iterable} blocking on
* {@link Iterator#next()} calls.
*
* <p>
* <img class="marble" src="doc-files/marbles/toIterable.svg" alt="">
* <p>
* Note that iterating from within threads marked as "non-blocking only" is illegal and will
* cause an {@link IllegalStateException} to be thrown, but obtaining the {@link Iterable}
* itself within these threads is ok.
*
* @return a blocking {@link Iterable}
*/
public final Iterable<T> toIterable() {
return toIterable(Queues.SMALL_BUFFER_SIZE);
}
代码示例来源:origin: reactor/reactor-core
/**
* Transform this {@link Flux} into a lazy {@link Iterable} blocking on
* {@link Iterator#next()} calls.
* <p>
* <img class="marble" src="doc-files/marbles/toIterableWithBatchSize.svg" alt="">
* <p>
* Note that iterating from within threads marked as "non-blocking only" is illegal and will
* cause an {@link IllegalStateException} to be thrown, but obtaining the {@link Iterable}
* itself within these threads is ok.
*
* @param batchSize the bounded capacity to prefetch from this {@link Flux} or
* {@code Integer.MAX_VALUE} for unbounded demand
* @return a blocking {@link Iterable}
*/
public final Iterable<T> toIterable(int batchSize) {
return toIterable(batchSize, null);
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 5000, expected = RuntimeException.class)
public void error() {
List<Integer> values = new ArrayList<>();
for (Integer i : Flux.<Integer>error(new RuntimeException("forced failure")).toIterable()) {
values.add(i);
}
Assert.assertEquals(Collections.emptyList(), values);
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 5000)
public void normal2() {
Queue<Integer> q = new ArrayBlockingQueue<>(1);
List<Integer> values = new ArrayList<>();
for (Integer i : Flux.range(1, 10)
.toIterable(1, () -> q)) {
values.add(i);
}
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), values);
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 5000)
public void empty() {
List<Integer> values = new ArrayList<>();
for (Integer i : FluxEmpty.<Integer>instance().toIterable()) {
values.add(i);
}
Assert.assertEquals(Collections.emptyList(), values);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void readQueuesFromPublishers() {
// "Read Queues from Publishers"
// given: "Iterable publisher of 1000 to read queue"
List<Integer> thousand = new ArrayList<>(1000);
for (int i = 1; i <= 1000; i++) {
thousand.add(i);
}
Flux<Integer> pub = Flux.fromIterable(thousand);
Iterator<Integer> queue = pub.toIterable()
.iterator();
// when: "read the queue"
Integer v = queue.next();
Integer v2 = queue.next();
for (int i = 0; i < 997; i++) {
queue.next();
}
Integer v3 = queue.next();
// then: "queues values correct"
assertThat(v).isEqualTo(1);
assertThat(v2).isEqualTo(2);
assertThat(v3).isEqualTo(1000);
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 5000)
public void normal() {
List<Integer> values = new ArrayList<>();
for (Integer i : Flux.range(1, 10)
.toIterable()) {
values.add(i);
}
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), values);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fluxCanBeTranslatedToCompletableQueue() {
// "A deferred Flux can be translated into a completable queue"
// given: "a composable with an initial value"
Flux<String> stream = Flux.just("test", "test2", "test3")
.log()
.publishOn(Schedulers.parallel());
// when: "the flux is retrieved"
stream = stream.map(it -> it + "-ok")
.log();
Iterator<String> queue = stream.toIterable()
.iterator();
List<String> result = new ArrayList<>();
while (queue.hasNext()) {
result.add(queue.next());
}
// then:"it is available"
assertThat(result).containsExactly("test-ok", "test2-ok", "test3-ok");
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 1000)
public void gh841_iteratorFromCreate() {
Iterator<String> it = Flux.<String>create(sink -> {
sink.next("a");
sink.next("b");
sink.complete();
}).sort((a, b) -> {
throw new IllegalStateException("boom");
}).toIterable().iterator();
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(it::hasNext)
.withMessage("boom");
}
代码示例来源:origin: reactor/reactor-core
try {
ref.set(Flux.just(1, 2, 3)
.toIterable());
代码示例来源:origin: reactor/reactor-core
/**
* See https://github.com/reactor/reactor-core/issues/508
*/
@Test
public void testPublishingTwice() {
StepVerifier.create(Flux.just(Flux.range(0, 300).toIterable(), Flux.range(0, 300).toIterable())
.flatMapIterable(x -> x)
.share()
.share()
.count())
.expectNext(600L)
.verifyComplete();
}
代码示例来源:origin: spring-projects/spring-integration
iterator = Flux.from((Publisher<Object>) result).toIterable().iterator();
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param batchSize
* @return
* @see reactor.core.publisher.Flux#toIterable(long)
*/
public final Iterable<T> toIterable(long batchSize) {
return boxed.toIterable(batchSize);
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @return
* @see reactor.core.publisher.Flux#toIterable()
*/
public final Iterable<T> toIterable() {
return boxed.toIterable();
}
/**
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param batchSize
* @param queueProvider
* @return
* @see reactor.core.publisher.Flux#toIterable(long, java.util.function.Supplier)
*/
public final Iterable<T> toIterable(long batchSize, Supplier<Queue<T>> queueProvider) {
return boxed.toIterable(batchSize, queueProvider);
}
/**
代码示例来源:origin: reactor/lite-rx-api-hands-on
@Override
public Iterable<User> findAll() {
callCount++;
return reactiveRepository.findAll().toIterable();
}
代码示例来源:origin: spring-cloud/spring-cloud-function
private Object result(Object input, Publisher<?> output) {
List<Object> result = new ArrayList<>();
for (Object value : Flux.from(output).toIterable()) {
result.add(value);
}
if (isSingleValue(input) && result.size() == 1) {
return result.get(0);
}
return result;
}
代码示例来源:origin: spring-cloud/spring-cloud-function
private Object result(Object input, Publisher<?> flux) {
List<Object> result = new ArrayList<>();
for (Object value : Flux.from(flux).toIterable()) {
result.add(value);
}
if (isSingleValue(input) && result.size()==1) {
return result.get(0);
}
return result;
}
代码示例来源:origin: spring-cloud/spring-cloud-function
private Object result(Object input, Publisher<?> output) {
List<O> result = new ArrayList<>();
for (Object value : Flux.from(output).toIterable()) {
result.add(convertOutput(value));
}
if (isSingleValue(input) && result.size() == 1) {
return result.get(0);
}
return result;
}
代码示例来源:origin: mohitsinha/tutorials
@Test
public void block() {
String name = Mono.just("Jesse").block();
assertEquals("Jesse", name);
Iterator<String> namesIterator = Flux.just("Tom", "Peter").toIterable().iterator();
assertEquals("Tom", namesIterator.next());
assertEquals("Peter", namesIterator.next());
assertFalse(namesIterator.hasNext());
}
内容来源于网络,如有侵权,请联系作者删除!