reactor.core.publisher.Flux.filter()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.9k)|赞(0)|评价(0)|浏览(396)

本文整理了Java中reactor.core.publisher.Flux.filter()方法的一些代码示例,展示了Flux.filter()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.filter()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:filter

Flux.filter介绍

[英]Evaluate each source value against the given Predicate. If the predicate test succeeds, the value is emitted. If the predicate test fails, the value is ignored and a request of 1 is made upstream.
[中]根据给定谓词计算每个源值。如果谓词测试成功,则会发出该值。如果谓词测试失败,则忽略该值,并向上游请求1。

代码示例

代码示例来源:origin: codecentric/spring-boot-admin

/**
 * List all registered instances with name
 *
 * @param name the name to search for
 * @return application list
 */
@GetMapping(path = "/instances", produces = MediaType.APPLICATION_JSON_VALUE, params = "name")
public Flux<Instance> instances(@RequestParam("name") String name) {
  return registry.getInstances(name).filter(Instance::isRegistered);
}

代码示例来源:origin: codecentric/spring-boot-admin

/**
 * List all registered instances with name
 *
 * @return application list
 */
@GetMapping(path = "/instances", produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<Instance> instances() {
  LOGGER.debug("Deliver all registered instances");
  return registry.getInstances().filter(Instance::isRegistered);
}

代码示例来源:origin: codecentric/spring-boot-admin

protected Mono<Void> updateStatusForAllInstances() {
  log.debug("Updating status for all instances");
  Instant expiryInstant = Instant.now().minus(statusLifetime);
  return Flux.fromIterable(lastQueried.entrySet())
        .filter(e -> e.getValue().isBefore(expiryInstant))
        .map(Map.Entry::getKey)
        .flatMap(this::updateStatus)
        .then();
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
  return publisher.subscribeOn(Schedulers.newSingle("info-updater"))
          .filter(event -> event instanceof InstanceEndpointsDetectedEvent ||
                   event instanceof InstanceStatusChangedEvent ||
                   event instanceof InstanceRegistrationUpdatedEvent)
          .flatMap(this::updateInfo);
}

代码示例来源:origin: codecentric/spring-boot-admin

@GetMapping(path = "/applications/{name}", produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<ResponseEntity<Application>> application(@PathVariable("name") String name) {
  return this.toApplication(name, registry.getInstances(name).filter(Instance::isRegistered))
        .filter(a -> !a.getInstances().isEmpty())
        .map(ResponseEntity::ok)
        .defaultIfEmpty(ResponseEntity.notFound().build());
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
  return publisher.subscribeOn(Schedulers.newSingle("endpoint-detector"))
          .filter(event -> event instanceof InstanceStatusChangedEvent ||
                   event instanceof InstanceRegistrationUpdatedEvent)
          .flatMap(this::detectEndpoints);
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Flux<Instance> findByName(String name) {
  return this.findAll().filter(a -> a.isRegistered() && name.equals(a.getRegistration().getName()));
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
public Flux<Instance> findByName(String name) {
  return findAll().filter(a -> a.isRegistered() && name.equals(a.getRegistration().getName()));
}

代码示例来源:origin: codecentric/spring-boot-admin

protected Mono<Void> sendReminders() {
  Instant now = Instant.now();
  return Flux.fromIterable(this.reminders.values())
        .filter(reminder -> reminder.getLastNotification().plus(reminderPeriod).isBefore(now))
        .flatMap(reminder -> delegate.notify(reminder.getEvent())
                      .doOnSuccess(signal -> reminder.setLastNotification(now)))
        .then();
}

代码示例来源:origin: resilience4j/resilience4j

@RequestMapping(value = "stream/events/{circuitBreakerName}/{eventType}", produces = MEDIA_TYPE_TEXT_EVENT_STREAM)
public SseEmitter getEventsStreamFilteredByCircuitBreakerNameAndEventType(@PathVariable("circuitBreakerName") String circuitBreakerName,
                                  @PathVariable("eventType") String eventType) {
  CircuitBreaker circuitBreaker = circuitBreakerRegistry.getAllCircuitBreakers()
      .find(cb -> cb.getName().equals(circuitBreakerName))
      .getOrElseThrow(() ->
          new IllegalArgumentException(String.format("circuit breaker with name %s not found", circuitBreakerName)));
  Flux<CircuitBreakerEvent> eventStream = toFlux(circuitBreaker.getEventPublisher())
      .filter(event -> event.getEventType() == CircuitBreakerEvent.Type.valueOf(eventType.toUpperCase()));
  return CircuitBreakerEventEmitter.createSseEmitter(eventStream);
}

代码示例来源:origin: spring-projects/spring-security

private <T extends WebFilter> Optional<T> getWebFilter(SecurityWebFilterChain filterChain, Class<T> filterClass) {
  return (Optional<T>) filterChain.getWebFilters()
      .filter(Objects::nonNull)
      .filter(filter -> filter.getClass().isAssignableFrom(filterClass))
      .singleOrEmpty()
      .blockOptional();
}

代码示例来源:origin: codecentric/spring-boot-admin

protected Tuple2<String, Flux<Instance>> getApplicationForInstance(Instance instance) {
  String name = instance.getRegistration().getName();
  return Tuples.of(name, registry.getInstances(name).filter(Instance::isRegistered));
}

代码示例来源:origin: codecentric/spring-boot-admin

@GetMapping(path = "/instances/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Instance>> instanceStream(@PathVariable String id) {
  return Flux.from(eventStore)
        .filter(event -> event.getInstance().equals(InstanceId.of(id)))
        .flatMap(event -> registry.getInstance(event.getInstance()))
        .map(event -> ServerSentEvent.builder(event).build())
        .mergeWith(ping());
}

代码示例来源:origin: codecentric/spring-boot-admin

@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
  return publisher.subscribeOn(Schedulers.newSingle("status-updater"))
          .filter(event -> event instanceof InstanceRegisteredEvent ||
                   event instanceof InstanceRegistrationUpdatedEvent)
          .flatMap(event -> updateStatus(event.getInstance()));
}

代码示例来源:origin: codecentric/spring-boot-admin

@GetMapping(path = "/applications", produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<Application> applications() {
  return registry.getInstances()
          .filter(Instance::isRegistered)
          .groupBy(instance -> instance.getRegistration().getName())
          .flatMap(grouped -> toApplication(grouped.key(), grouped));
}

代码示例来源:origin: reactor/reactor-core

@Test
public void namedHideFluxTest() {
  Flux<Integer> named1 =
      Flux.range(1, 10)
        .hide()
        .name("100s");
  Flux<Integer> named2 = named1.filter(i -> i % 3 == 0)
                 .name("multiple of 3 100s")
                 .hide();
  assertThat(Scannable.from(named1).name()).isEqualTo("100s");
  assertThat(Scannable.from(named2).name()).isEqualTo("multiple of 3 100s");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardOnNextPredicateFail() {
  StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //range uses tryOnNext, so let's use just instead
              .filter(i -> { throw new IllegalStateException("boom"); })
  )
        .expectFusion(Fuseable.NONE)
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasDiscardedExactly(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardTryOnNextPredicateFail() {
  StepVerifier.create(Flux.range(1, 10) //range uses tryOnNext
              .filter(i -> { throw new IllegalStateException("boom"); })
  )
        .expectFusion(Fuseable.NONE)
        .expectErrorMessage("boom")
        .verifyThenAssertThat()
        .hasDiscardedExactly(1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncFusedThreadBarrierConditional() {
  StepVerifier.create(Flux.range(1, 5)
              .doFinally(this)
              .filter(i -> true))
        .expectFusion(SYNC | THREAD_BARRIER, NONE)
        .expectNext(1, 2, 3, 4, 5)
        .expectComplete()
        .verify();
  assertEquals(1, calls);
  assertEquals(SignalType.ON_COMPLETE, signalType);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void discardPollAsyncPredicateMiss() {
  StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //range uses tryOnNext, so let's use just instead
              .publishOn(Schedulers.newSingle("discardPollAsync"))
              .filter(i -> i % 2 == 0)
  )
        .expectFusion(Fuseable.ASYNC)
        .expectNextCount(5)
        .expectComplete()
        .verifyThenAssertThat()
        .hasDiscardedExactly(1, 3, 5, 7, 9);
}

相关文章

Flux类方法