reactor.core.publisher.Flux.toStream()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(447)

本文整理了Java中reactor.core.publisher.Flux.toStream()方法的一些代码示例,展示了Flux.toStream()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.toStream()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:toStream

Flux.toStream介绍

[英]Transform this Flux into a lazy Stream blocking for each source Subscriber#onNext(Object) call.

Note that iterating from within threads marked as "non-blocking only" is illegal and will cause an IllegalStateException to be thrown, but obtaining the Streamitself or applying lazy intermediate operation on the stream within these threads is ok.
[中]对于每个源订户#onNext(对象)调用,将此流量转换为惰性流阻塞。
请注意,从标记为“仅非阻塞”的线程内进行迭代是非法的,并将导致引发IllegalStateException,但在这些线程内获取流本身或对流应用惰性中间操作是可以的。

代码示例

代码示例来源:origin: reactor/reactor-core

/**
 * Transform this {@link Flux} into a lazy {@link Stream} blocking for each source
 * {@link Subscriber#onNext(Object) onNext} call.
 *
 * <p>
 * <img class="marble" src="doc-files/marbles/toStream.svg" alt="">
 * <p>
 * Note that iterating from within threads marked as "non-blocking only" is illegal and will
 * cause an {@link IllegalStateException} to be thrown, but obtaining the {@link Stream}
 * itself or applying lazy intermediate operation on the stream within these threads is ok.
 *
 * @return a {@link Stream} of unknown size with onClose attached to {@link Subscription#cancel()}
 */
public final Stream<T> toStream() {
  return toStream(Queues.SMALL_BUFFER_SIZE);
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5000)
public void toStream() {
  List<Integer> values = new ArrayList<>();
  Flux.range(1, 10)
    .toStream()
    .forEach(values::add);
  Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), values);
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5000)
public void streamLimit() {
  List<Integer> values = new ArrayList<>();
  Flux.range(1, Integer.MAX_VALUE)
    .toStream()
    .limit(10)
    .forEach(values::add);
  Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), values);
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 5000)
public void streamEmpty() {
  List<Integer> values = new ArrayList<>();
  FluxEmpty.<Integer>instance().toStream()
                 .forEach(values::add);
  Assert.assertEquals(Collections.emptyList(), values);
}

代码示例来源:origin: spring-projects/spring-security

@Test
public void convertWithGrantedAuthoritiesConverter() {
  Jwt jwt = this.jwt(Collections.singletonMap("scope", "message:read message:write"));
  Converter<Jwt, Collection<GrantedAuthority>> grantedAuthoritiesConverter =
      token -> Arrays.asList(new SimpleGrantedAuthority("blah"));
  Collection<GrantedAuthority> authorities =
      new ReactiveJwtGrantedAuthoritiesConverterAdapter(grantedAuthoritiesConverter)
          .convert(jwt)
          .toStream()
          .collect(Collectors.toList());
  assertThat(authorities).containsExactly(
      new SimpleGrantedAuthority("blah"));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<NumericResponse<SUnionStoreCommand, Long>> sUnionStore(Publisher<SUnionStoreCommand> commands) {
  return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKeys(), "Source keys must not be null!");
    Assert.notNull(command.getKey(), "Destination key must not be null!");
    List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
    keys.add(command.getKey());
    if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
      return super.sUnionStore(Mono.just(command));
    }
    return sUnion(Mono.just(SUnionCommand.keys(command.getKeys()))).next().flatMap(values -> {
      Mono<Long> result = cmd.sadd(command.getKey(), values.getOutput().toStream().toArray(ByteBuffer[]::new));
      return result.map(value -> new NumericResponse<>(command, value));
    });
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<NumericResponse<SDiffStoreCommand, Long>> sDiffStore(Publisher<SDiffStoreCommand> commands) {
  return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKeys(), "Source keys must not be null!");
    Assert.notNull(command.getKey(), "Destination key must not be null!");
    List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
    keys.add(command.getKey());
    if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
      return super.sDiffStore(Mono.just(command));
    }
    return sDiff(Mono.just(SDiffCommand.keys(command.getKeys()))).next().flatMap(values -> {
      Mono<Long> result = cmd.sadd(command.getKey(), values.getOutput().toStream().toArray(ByteBuffer[]::new));
      return result.map(value -> new NumericResponse<>(command, value));
    });
  }));
}

代码示例来源:origin: spring-projects/spring-data-redis

@Override
public Flux<NumericResponse<SInterStoreCommand, Long>> sInterStore(Publisher<SInterStoreCommand> commands) {
  return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {
    Assert.notNull(command.getKeys(), "Source keys must not be null!");
    Assert.notNull(command.getKey(), "Destination key must not be null!");
    List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
    keys.add(command.getKey());
    if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
      return super.sInterStore(Mono.just(command));
    }
    return sInter(Mono.just(SInterCommand.keys(command.getKeys()))).next().flatMap(values -> {
      Mono<Long> result = cmd.sadd(command.getKey(), values.getOutput().toStream().toArray(ByteBuffer[]::new));
      return result.map(value -> new NumericResponse<>(command, value));
    });
  }));
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 1000)
public void gh841_streamFromIterable() {
  Flux<String> source = Flux.fromIterable(Arrays.asList("a","b"))
               .sort((a, b) -> { throw new IllegalStateException("boom"); });
  assertThatExceptionOfType(IllegalStateException.class)
      .isThrownBy(() -> source.toStream()
                  .collect(Collectors.toSet()))
      .withMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 1000)
public void gh841_streamCreate() {
  Flux<String> source = Flux.<String>create(sink -> {
    sink.next("a");
    sink.next("b");
    sink.complete();
  })
      .sort((a, b) -> { throw new IllegalStateException("boom"); });
  assertThatExceptionOfType(IllegalStateException.class)
      .isThrownBy(() -> source.toStream()
                  .collect(Collectors.toSet()))
      .withMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

try {
  ref.set(Flux.just(1, 2, 3)
        .toStream());

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 1000)
  public void gh841_workaroundStream() {
    Flux<String> source = Flux.<String>create(sink -> {
      sink.next("a");
      sink.next("b");
      sink.complete();
    })
        .collectSortedList((a, b) -> { throw new IllegalStateException("boom"); })
        .hide()
        .flatMapIterable(Function.identity());

    assertThatExceptionOfType(IllegalStateException.class)
        .isThrownBy(() -> source.toStream()
                    .collect(Collectors.toSet()))
        .withMessage("boom");
  }
}

代码示例来源:origin: reactor/reactor-core

@Test(timeout = 1000)
public void gh841_streamCreateDeferredError() {
  Flux<Integer> source = Flux.<Integer>create(sink -> {
    sink.next(1);
    sink.next(2);
    sink.next(0);
    sink.complete();
  })
      .map(v -> 4 / v)
      .log();
  assertThatExceptionOfType(ArithmeticException.class)
      .isThrownBy(() -> source.toStream(1)
                  .collect(Collectors.toSet()))
      .withMessage("/ by zero");
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param batchSize
 * @return
 * @see reactor.core.publisher.Flux#toStream(int)
 */
public Stream<T> toStream(int batchSize) {
  return boxed.toStream(batchSize);
}
/**

代码示例来源:origin: apache/james-project

@Test
  default void groupsWithFailedEventsShouldReturnEmptyWhenNoStored() {
    EventDeadLetters eventDeadLetters = eventDeadLetters();
    assertThat(eventDeadLetters.groupsWithFailedEvents().toStream()).isEmpty();
  }
}

代码示例来源:origin: apache/james-project

@Test
default void storeShouldIgnoreStoreDuplicatedEventsPerGroup() {
  EventDeadLetters eventDeadLetters = eventDeadLetters();
  eventDeadLetters.store(GROUP_A, EVENT_1).block();
  eventDeadLetters.store(GROUP_A, EVENT_1).block();
  eventDeadLetters.store(GROUP_A, EVENT_1).block();
  assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream())
    .containsExactly(EVENT_ID_1);
}

代码示例来源:origin: apache/james-project

@Test
default void failedEventsByGroupShouldReturnEmptyWhenNonMatch() {
  EventDeadLetters eventDeadLetters = eventDeadLetters();
  eventDeadLetters.store(GROUP_A, EVENT_1).block();
  eventDeadLetters.store(GROUP_A, EVENT_2).block();
  eventDeadLetters.store(GROUP_A, EVENT_3).block();
  assertThat(eventDeadLetters.failedEventIds(GROUP_B).toStream())
    .isEmpty();
}

代码示例来源:origin: apache/james-project

@Test
default void failedEventsByGroupShouldReturnAllEventsCorrespondingToGivenGroup() {
  EventDeadLetters eventDeadLetters = eventDeadLetters();
  eventDeadLetters.store(GROUP_A, EVENT_1).block();
  eventDeadLetters.store(GROUP_A, EVENT_2).block();
  eventDeadLetters.store(GROUP_B, EVENT_3).block();
  assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream())
    .containsOnly(EVENT_ID_1, EVENT_ID_2);
}

代码示例来源:origin: apache/james-project

@Test
default void groupsWithFailedEventsShouldReturnAllStoredGroups() {
  EventDeadLetters eventDeadLetters = eventDeadLetters();
  eventDeadLetters.store(GROUP_A, EVENT_1).block();
  eventDeadLetters.store(GROUP_B, EVENT_1).block();
  assertThat(eventDeadLetters.groupsWithFailedEvents().toStream())
    .containsOnly(GROUP_A, GROUP_B);
}

代码示例来源:origin: apache/james-project

@Test
  default void failedEventsByGroupShouldNotRemoveEvents() {
    EventDeadLetters eventDeadLetters = eventDeadLetters();
    eventDeadLetters.store(GROUP_A, EVENT_1).block();
    eventDeadLetters.store(GROUP_A, EVENT_2).block();
    eventDeadLetters.store(GROUP_B, EVENT_3).block();
    eventDeadLetters.failedEventIds(GROUP_A).toStream();
    assertThat(allEventIds())
      .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3);
  }
}

相关文章

Flux类方法