本文整理了Java中reactor.core.publisher.Flux.toStream()
方法的一些代码示例,展示了Flux.toStream()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.toStream()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:toStream
[英]Transform this Flux into a lazy Stream blocking for each source Subscriber#onNext(Object) call.
Note that iterating from within threads marked as "non-blocking only" is illegal and will cause an IllegalStateException to be thrown, but obtaining the Streamitself or applying lazy intermediate operation on the stream within these threads is ok.
[中]对于每个源订户#onNext(对象)调用,将此流量转换为惰性流阻塞。
请注意,从标记为“仅非阻塞”的线程内进行迭代是非法的,并将导致引发IllegalStateException,但在这些线程内获取流本身或对流应用惰性中间操作是可以的。
代码示例来源:origin: reactor/reactor-core
/**
* Transform this {@link Flux} into a lazy {@link Stream} blocking for each source
* {@link Subscriber#onNext(Object) onNext} call.
*
* <p>
* <img class="marble" src="doc-files/marbles/toStream.svg" alt="">
* <p>
* Note that iterating from within threads marked as "non-blocking only" is illegal and will
* cause an {@link IllegalStateException} to be thrown, but obtaining the {@link Stream}
* itself or applying lazy intermediate operation on the stream within these threads is ok.
*
* @return a {@link Stream} of unknown size with onClose attached to {@link Subscription#cancel()}
*/
public final Stream<T> toStream() {
return toStream(Queues.SMALL_BUFFER_SIZE);
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 5000)
public void toStream() {
List<Integer> values = new ArrayList<>();
Flux.range(1, 10)
.toStream()
.forEach(values::add);
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), values);
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 5000)
public void streamLimit() {
List<Integer> values = new ArrayList<>();
Flux.range(1, Integer.MAX_VALUE)
.toStream()
.limit(10)
.forEach(values::add);
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), values);
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 5000)
public void streamEmpty() {
List<Integer> values = new ArrayList<>();
FluxEmpty.<Integer>instance().toStream()
.forEach(values::add);
Assert.assertEquals(Collections.emptyList(), values);
}
代码示例来源:origin: spring-projects/spring-security
@Test
public void convertWithGrantedAuthoritiesConverter() {
Jwt jwt = this.jwt(Collections.singletonMap("scope", "message:read message:write"));
Converter<Jwt, Collection<GrantedAuthority>> grantedAuthoritiesConverter =
token -> Arrays.asList(new SimpleGrantedAuthority("blah"));
Collection<GrantedAuthority> authorities =
new ReactiveJwtGrantedAuthoritiesConverterAdapter(grantedAuthoritiesConverter)
.convert(jwt)
.toStream()
.collect(Collectors.toList());
assertThat(authorities).containsExactly(
new SimpleGrantedAuthority("blah"));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<NumericResponse<SUnionStoreCommand, Long>> sUnionStore(Publisher<SUnionStoreCommand> commands) {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKeys(), "Source keys must not be null!");
Assert.notNull(command.getKey(), "Destination key must not be null!");
List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
keys.add(command.getKey());
if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
return super.sUnionStore(Mono.just(command));
}
return sUnion(Mono.just(SUnionCommand.keys(command.getKeys()))).next().flatMap(values -> {
Mono<Long> result = cmd.sadd(command.getKey(), values.getOutput().toStream().toArray(ByteBuffer[]::new));
return result.map(value -> new NumericResponse<>(command, value));
});
}));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<NumericResponse<SDiffStoreCommand, Long>> sDiffStore(Publisher<SDiffStoreCommand> commands) {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKeys(), "Source keys must not be null!");
Assert.notNull(command.getKey(), "Destination key must not be null!");
List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
keys.add(command.getKey());
if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
return super.sDiffStore(Mono.just(command));
}
return sDiff(Mono.just(SDiffCommand.keys(command.getKeys()))).next().flatMap(values -> {
Mono<Long> result = cmd.sadd(command.getKey(), values.getOutput().toStream().toArray(ByteBuffer[]::new));
return result.map(value -> new NumericResponse<>(command, value));
});
}));
}
代码示例来源:origin: spring-projects/spring-data-redis
@Override
public Flux<NumericResponse<SInterStoreCommand, Long>> sInterStore(Publisher<SInterStoreCommand> commands) {
return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKeys(), "Source keys must not be null!");
Assert.notNull(command.getKey(), "Destination key must not be null!");
List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
keys.add(command.getKey());
if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
return super.sInterStore(Mono.just(command));
}
return sInter(Mono.just(SInterCommand.keys(command.getKeys()))).next().flatMap(values -> {
Mono<Long> result = cmd.sadd(command.getKey(), values.getOutput().toStream().toArray(ByteBuffer[]::new));
return result.map(value -> new NumericResponse<>(command, value));
});
}));
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 1000)
public void gh841_streamFromIterable() {
Flux<String> source = Flux.fromIterable(Arrays.asList("a","b"))
.sort((a, b) -> { throw new IllegalStateException("boom"); });
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() -> source.toStream()
.collect(Collectors.toSet()))
.withMessage("boom");
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 1000)
public void gh841_streamCreate() {
Flux<String> source = Flux.<String>create(sink -> {
sink.next("a");
sink.next("b");
sink.complete();
})
.sort((a, b) -> { throw new IllegalStateException("boom"); });
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() -> source.toStream()
.collect(Collectors.toSet()))
.withMessage("boom");
}
代码示例来源:origin: reactor/reactor-core
try {
ref.set(Flux.just(1, 2, 3)
.toStream());
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 1000)
public void gh841_workaroundStream() {
Flux<String> source = Flux.<String>create(sink -> {
sink.next("a");
sink.next("b");
sink.complete();
})
.collectSortedList((a, b) -> { throw new IllegalStateException("boom"); })
.hide()
.flatMapIterable(Function.identity());
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() -> source.toStream()
.collect(Collectors.toSet()))
.withMessage("boom");
}
}
代码示例来源:origin: reactor/reactor-core
@Test(timeout = 1000)
public void gh841_streamCreateDeferredError() {
Flux<Integer> source = Flux.<Integer>create(sink -> {
sink.next(1);
sink.next(2);
sink.next(0);
sink.complete();
})
.map(v -> 4 / v)
.log();
assertThatExceptionOfType(ArithmeticException.class)
.isThrownBy(() -> source.toStream(1)
.collect(Collectors.toSet()))
.withMessage("/ by zero");
}
代码示例来源:origin: com.aol.cyclops/cyclops-reactor
/**
* @param batchSize
* @return
* @see reactor.core.publisher.Flux#toStream(int)
*/
public Stream<T> toStream(int batchSize) {
return boxed.toStream(batchSize);
}
/**
代码示例来源:origin: apache/james-project
@Test
default void groupsWithFailedEventsShouldReturnEmptyWhenNoStored() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
assertThat(eventDeadLetters.groupsWithFailedEvents().toStream()).isEmpty();
}
}
代码示例来源:origin: apache/james-project
@Test
default void storeShouldIgnoreStoreDuplicatedEventsPerGroup() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
eventDeadLetters.store(GROUP_A, EVENT_1).block();
eventDeadLetters.store(GROUP_A, EVENT_1).block();
eventDeadLetters.store(GROUP_A, EVENT_1).block();
assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream())
.containsExactly(EVENT_ID_1);
}
代码示例来源:origin: apache/james-project
@Test
default void failedEventsByGroupShouldReturnEmptyWhenNonMatch() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
eventDeadLetters.store(GROUP_A, EVENT_1).block();
eventDeadLetters.store(GROUP_A, EVENT_2).block();
eventDeadLetters.store(GROUP_A, EVENT_3).block();
assertThat(eventDeadLetters.failedEventIds(GROUP_B).toStream())
.isEmpty();
}
代码示例来源:origin: apache/james-project
@Test
default void failedEventsByGroupShouldReturnAllEventsCorrespondingToGivenGroup() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
eventDeadLetters.store(GROUP_A, EVENT_1).block();
eventDeadLetters.store(GROUP_A, EVENT_2).block();
eventDeadLetters.store(GROUP_B, EVENT_3).block();
assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream())
.containsOnly(EVENT_ID_1, EVENT_ID_2);
}
代码示例来源:origin: apache/james-project
@Test
default void groupsWithFailedEventsShouldReturnAllStoredGroups() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
eventDeadLetters.store(GROUP_A, EVENT_1).block();
eventDeadLetters.store(GROUP_B, EVENT_1).block();
assertThat(eventDeadLetters.groupsWithFailedEvents().toStream())
.containsOnly(GROUP_A, GROUP_B);
}
代码示例来源:origin: apache/james-project
@Test
default void failedEventsByGroupShouldNotRemoveEvents() {
EventDeadLetters eventDeadLetters = eventDeadLetters();
eventDeadLetters.store(GROUP_A, EVENT_1).block();
eventDeadLetters.store(GROUP_A, EVENT_2).block();
eventDeadLetters.store(GROUP_B, EVENT_3).block();
eventDeadLetters.failedEventIds(GROUP_A).toStream();
assertThat(allEventIds())
.containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3);
}
}
内容来源于网络,如有侵权,请联系作者删除!