本文整理了Java中reactor.core.publisher.Flux.filter()
方法的一些代码示例,展示了Flux.filter()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.filter()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:filter
[英]Evaluate each source value against the given Predicate. If the predicate test succeeds, the value is emitted. If the predicate test fails, the value is ignored and a request of 1 is made upstream.
[中]根据给定谓词计算每个源值。如果谓词测试成功,则会发出该值。如果谓词测试失败,则忽略该值,并向上游请求1。
代码示例来源:origin: codecentric/spring-boot-admin
/**
* List all registered instances with name
*
* @param name the name to search for
* @return application list
*/
@GetMapping(path = "/instances", produces = MediaType.APPLICATION_JSON_VALUE, params = "name")
public Flux<Instance> instances(@RequestParam("name") String name) {
return registry.getInstances(name).filter(Instance::isRegistered);
}
代码示例来源:origin: codecentric/spring-boot-admin
/**
* List all registered instances with name
*
* @return application list
*/
@GetMapping(path = "/instances", produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<Instance> instances() {
LOGGER.debug("Deliver all registered instances");
return registry.getInstances().filter(Instance::isRegistered);
}
代码示例来源:origin: codecentric/spring-boot-admin
protected Mono<Void> updateStatusForAllInstances() {
log.debug("Updating status for all instances");
Instant expiryInstant = Instant.now().minus(statusLifetime);
return Flux.fromIterable(lastQueried.entrySet())
.filter(e -> e.getValue().isBefore(expiryInstant))
.map(Map.Entry::getKey)
.flatMap(this::updateStatus)
.then();
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
return publisher.subscribeOn(Schedulers.newSingle("info-updater"))
.filter(event -> event instanceof InstanceEndpointsDetectedEvent ||
event instanceof InstanceStatusChangedEvent ||
event instanceof InstanceRegistrationUpdatedEvent)
.flatMap(this::updateInfo);
}
代码示例来源:origin: codecentric/spring-boot-admin
@GetMapping(path = "/applications/{name}", produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<ResponseEntity<Application>> application(@PathVariable("name") String name) {
return this.toApplication(name, registry.getInstances(name).filter(Instance::isRegistered))
.filter(a -> !a.getInstances().isEmpty())
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
return publisher.subscribeOn(Schedulers.newSingle("endpoint-detector"))
.filter(event -> event instanceof InstanceStatusChangedEvent ||
event instanceof InstanceRegistrationUpdatedEvent)
.flatMap(this::detectEndpoints);
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
public Flux<Instance> findByName(String name) {
return this.findAll().filter(a -> a.isRegistered() && name.equals(a.getRegistration().getName()));
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
public Flux<Instance> findByName(String name) {
return findAll().filter(a -> a.isRegistered() && name.equals(a.getRegistration().getName()));
}
代码示例来源:origin: codecentric/spring-boot-admin
protected Mono<Void> sendReminders() {
Instant now = Instant.now();
return Flux.fromIterable(this.reminders.values())
.filter(reminder -> reminder.getLastNotification().plus(reminderPeriod).isBefore(now))
.flatMap(reminder -> delegate.notify(reminder.getEvent())
.doOnSuccess(signal -> reminder.setLastNotification(now)))
.then();
}
代码示例来源:origin: resilience4j/resilience4j
@RequestMapping(value = "stream/events/{circuitBreakerName}/{eventType}", produces = MEDIA_TYPE_TEXT_EVENT_STREAM)
public SseEmitter getEventsStreamFilteredByCircuitBreakerNameAndEventType(@PathVariable("circuitBreakerName") String circuitBreakerName,
@PathVariable("eventType") String eventType) {
CircuitBreaker circuitBreaker = circuitBreakerRegistry.getAllCircuitBreakers()
.find(cb -> cb.getName().equals(circuitBreakerName))
.getOrElseThrow(() ->
new IllegalArgumentException(String.format("circuit breaker with name %s not found", circuitBreakerName)));
Flux<CircuitBreakerEvent> eventStream = toFlux(circuitBreaker.getEventPublisher())
.filter(event -> event.getEventType() == CircuitBreakerEvent.Type.valueOf(eventType.toUpperCase()));
return CircuitBreakerEventEmitter.createSseEmitter(eventStream);
}
代码示例来源:origin: spring-projects/spring-security
private <T extends WebFilter> Optional<T> getWebFilter(SecurityWebFilterChain filterChain, Class<T> filterClass) {
return (Optional<T>) filterChain.getWebFilters()
.filter(Objects::nonNull)
.filter(filter -> filter.getClass().isAssignableFrom(filterClass))
.singleOrEmpty()
.blockOptional();
}
代码示例来源:origin: codecentric/spring-boot-admin
protected Tuple2<String, Flux<Instance>> getApplicationForInstance(Instance instance) {
String name = instance.getRegistration().getName();
return Tuples.of(name, registry.getInstances(name).filter(Instance::isRegistered));
}
代码示例来源:origin: codecentric/spring-boot-admin
@GetMapping(path = "/instances/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Instance>> instanceStream(@PathVariable String id) {
return Flux.from(eventStore)
.filter(event -> event.getInstance().equals(InstanceId.of(id)))
.flatMap(event -> registry.getInstance(event.getInstance()))
.map(event -> ServerSentEvent.builder(event).build())
.mergeWith(ping());
}
代码示例来源:origin: codecentric/spring-boot-admin
@Override
protected Publisher<Void> handle(Flux<InstanceEvent> publisher) {
return publisher.subscribeOn(Schedulers.newSingle("status-updater"))
.filter(event -> event instanceof InstanceRegisteredEvent ||
event instanceof InstanceRegistrationUpdatedEvent)
.flatMap(event -> updateStatus(event.getInstance()));
}
代码示例来源:origin: codecentric/spring-boot-admin
@GetMapping(path = "/applications", produces = MediaType.APPLICATION_JSON_VALUE)
public Flux<Application> applications() {
return registry.getInstances()
.filter(Instance::isRegistered)
.groupBy(instance -> instance.getRegistration().getName())
.flatMap(grouped -> toApplication(grouped.key(), grouped));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void namedHideFluxTest() {
Flux<Integer> named1 =
Flux.range(1, 10)
.hide()
.name("100s");
Flux<Integer> named2 = named1.filter(i -> i % 3 == 0)
.name("multiple of 3 100s")
.hide();
assertThat(Scannable.from(named1).name()).isEqualTo("100s");
assertThat(Scannable.from(named2).name()).isEqualTo("multiple of 3 100s");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardOnNextPredicateFail() {
StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //range uses tryOnNext, so let's use just instead
.filter(i -> { throw new IllegalStateException("boom"); })
)
.expectFusion(Fuseable.NONE)
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardTryOnNextPredicateFail() {
StepVerifier.create(Flux.range(1, 10) //range uses tryOnNext
.filter(i -> { throw new IllegalStateException("boom"); })
)
.expectFusion(Fuseable.NONE)
.expectErrorMessage("boom")
.verifyThenAssertThat()
.hasDiscardedExactly(1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncFusedThreadBarrierConditional() {
StepVerifier.create(Flux.range(1, 5)
.doFinally(this)
.filter(i -> true))
.expectFusion(SYNC | THREAD_BARRIER, NONE)
.expectNext(1, 2, 3, 4, 5)
.expectComplete()
.verify();
assertEquals(1, calls);
assertEquals(SignalType.ON_COMPLETE, signalType);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void discardPollAsyncPredicateMiss() {
StepVerifier.create(Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) //range uses tryOnNext, so let's use just instead
.publishOn(Schedulers.newSingle("discardPollAsync"))
.filter(i -> i % 2 == 0)
)
.expectFusion(Fuseable.ASYNC)
.expectNextCount(5)
.expectComplete()
.verifyThenAssertThat()
.hasDiscardedExactly(1, 3, 5, 7, 9);
}
内容来源于网络,如有侵权,请联系作者删除!