reactor.core.publisher.Flux.doAfterTerminate()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(9.6k)|赞(0)|评价(0)|浏览(169)

本文整理了Java中reactor.core.publisher.Flux.doAfterTerminate()方法的一些代码示例,展示了Flux.doAfterTerminate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.doAfterTerminate()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:doAfterTerminate

Flux.doAfterTerminate介绍

[英]Add behavior (side-effect) triggered after the Flux terminates, either by completing downstream successfully or with an error.
[中]添加流量终止后触发的行为(副作用),通过成功完成下游或出现错误。

代码示例

代码示例来源:origin: reactor/reactor-core

@Test
  public void fluxCanListenForTerminalStates() {
//    "A deferred Flux can listen for terminal states"
//        given: "a composable with an initial value"
    Flux<String> stream = Flux.just("test");

//        when: "the complete signal is observed and flux is retrieved"
    AtomicReference<Object> value = new AtomicReference<>();

    stream.doAfterTerminate(() -> value.set(Boolean.TRUE))
       .subscribe(value::set);

//        then: "it is available"
    assertThat(value.get())
        .isNotNull()
        .isNotEqualTo("test")
        .isEqualTo(Boolean.TRUE);
  }

代码示例来源:origin: reactor/reactor-core

@Test
public void syncPollAfterTerminateCalled() {
  AtomicBoolean onAfterTerminate = new AtomicBoolean();
  ConnectableFlux<Integer> f = Flux.just(1)
                  .doAfterTerminate(() -> onAfterTerminate.set(true))
                  .publish();
  StepVerifier.create(f)
        .then(f::connect)
        .expectNext(1)
        .verifyComplete();
  assertThat(onAfterTerminate.get()).withFailMessage("onAfterTerminate not called back").isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncPollConditionalAfterTerminateCalled() {
  AtomicBoolean onAfterTerminate = new AtomicBoolean();
  ConnectableFlux<Integer> f = Flux.just(1)
                  .doAfterTerminate(() -> onAfterTerminate.set(true))
                  .filter(v -> true)
                  .publish();
  StepVerifier.create(f)
        .then(f::connect)
        .expectNext(1)
        .verifyComplete();
  assertThat(onAfterTerminate.get()).withFailMessage("onAfterTerminate not called back").isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void noFusionAfterTerminateCalled() {
  AtomicBoolean onTerminate = new AtomicBoolean();
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  Flux.range(1, 2)
    .doAfterTerminate(() -> onTerminate.set(true))
    .subscribe(ts);
  ts.assertNoError()
   .assertValues(1, 2)
   .assertComplete();
  Assert.assertTrue("onComplete not called back", onTerminate.get());
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncdoAfterTerminateCalled() {
  AtomicBoolean onTerminate = new AtomicBoolean();
  AssertSubscriber<Object> ts = AssertSubscriber.create();
  Flux.range(1, 2)
    .hide()
    .doAfterTerminate(() -> onTerminate.set(true))
    .subscribe(ts);
  ts.assertNoError()
   .assertValues(1, 2)
   .assertComplete();
  Assert.assertTrue("onComplete not called back", onTerminate.get());
}

代码示例来源:origin: reactor/reactor-core

combinedScenarios.addAll(Arrays.asList(scenario(f -> f.doAfterTerminate(() -> {
      throw droppedException();
    })).fusionMode(Fuseable.NONE)

代码示例来源:origin: reactor/reactor-core

@Override
  Flux<Integer> transformFlux(Flux<Integer> f) {
    Flux<String> otherStream = Flux.just("test", "test2", "test3");
//        System.out.println("Providing new downstream");

    Scheduler asyncGroup = Schedulers.newParallel("flux-p-tck", 2);

    BiFunction<Integer, String, Integer> combinator = (t1, t2) -> t1;

    return f.publishOn(sharedGroup)
        .parallel(2)
        .groups()
        .flatMap(stream -> stream.publishOn(asyncGroup)
                     .doOnNext(this::monitorThreadUse)
                     .scan((prev, next) -> next)
                     .map(integer -> -integer)
                     .filter(integer -> integer <= 0)
                     .map(integer -> -integer)
                     .bufferTimeout(batch, Duration.ofMillis(50))
                     .flatMap(Flux::fromIterable)
                     .flatMap(i -> Flux.zip(Flux.just(i), otherStream, combinator))
         )
        .publishOn(sharedGroup)
        .doAfterTerminate(asyncGroup::dispose)
        .doOnError(Throwable::printStackTrace);
  }

代码示例来源:origin: reactor/reactor-core

@Test
public void sampleTest() throws Exception {
  CountDownLatch latch = new CountDownLatch(1);
  Disposable top10every1second =
   Flux.fromIterable(PULP_SAMPLE)
       .publishOn(asyncGroup)
       .flatMap(samuelJackson ->
      Flux
       .fromArray(samuelJackson.split(" "))
       .publishOn(asyncGroup)
       .filter(w -> !w.trim().isEmpty())
       .doOnNext(i -> simulateLatency())
    )
       .window(Duration.ofSeconds(2))
       .flatMap(s -> s.groupBy(w -> w)
              .flatMap(w -> w.count().map(c -> Tuples.of(w.key(), c)))
              .collectSortedList((a, b) -> -a.getT2().compareTo(b.getT2()))
              .flatMapMany(Flux::fromIterable)
              .take(10)
              .doAfterTerminate(() -> LOG.info("------------------------ window terminated" +
             "----------------------"))
    )
       .subscribe(
     entry -> LOG.info(entry.getT1() + ": " + entry.getT2()),
     error -> LOG.error("", error),
          latch::countDown
    );
  awaitLatch(top10every1second, latch);
}

代码示例来源:origin: reactor/reactor-core

})).producerEmpty(),
scenario(f -> f.doAfterTerminate(() -> {
          throw exception();
        })).producerEmpty(),

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
  return Arrays.asList(scenario(f -> f.doOnSubscribe(s -> {
      })),
      scenario(f -> f.doOnError(s -> {
      })),
      scenario(f -> f.doOnTerminate(() -> {
      })),
      scenario(f -> f.doAfterTerminate(() -> {
      })),
      scenario(f -> f.doOnCancel(() -> {
      })),
      scenario(f -> f.doOnComplete(() -> {
      })),
      scenario(f -> f.doOnRequest(d -> {
      })),
      scenario(f -> f.doOnRequest(s -> {
        throw new RuntimeException(); //ignored
      })),
      scenario(f -> f.doOnNext(s -> {
      })),
      scenario(f -> f.doOnError(s -> {
      })));
}

代码示例来源:origin: com.aol.cyclops/cyclops-reactor

/**
 * @param afterTerminate
 * @return
 * @see reactor.core.publisher.Flux#doAfterTerminate(java.lang.Runnable)
 */
public final Flux<T> doAfterTerminate(Runnable afterTerminate) {
  return boxed.doAfterTerminate(afterTerminate);
}
/**

代码示例来源:origin: reactor/reactor-kafka

@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
  this.ackMode = AckMode.AUTO_ACK;
  Flux<ConsumerRecords<K, V>> flux = withDoOnRequest(createConsumerFlux());
  return flux
      .map(consumerRecords -> Flux.fromIterable(consumerRecords)
                    .doAfterTerminate(() -> {
                      for (ConsumerRecord<K, V> r : consumerRecords)
                        new CommittableOffset(r).acknowledge();
                    }));
}

代码示例来源:origin: io.projectreactor.kafka/reactor-kafka

@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck() {
  this.ackMode = AckMode.AUTO_ACK;
  Flux<ConsumerRecords<K, V>> flux = withDoOnRequest(createConsumerFlux());
  return flux
      .map(consumerRecords -> Flux.fromIterable(consumerRecords)
                    .doAfterTerminate(() -> {
                      for (ConsumerRecord<K, V> r : consumerRecords)
                        new CommittableOffset(r).acknowledge();
                    }));
}

代码示例来源:origin: io.projectreactor.kafka/reactor-kafka

private synchronized Flux<ConsumerRecords<K, V>> createConsumerFlux() {
  if (consumerFlux != null)
    throw new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux");
  Consumer<Flux<?>> kafkaSubscribeOrAssign = (flux) -> receiverOptions.subscriber(this).accept(consumer);
  initEvent = new InitEvent(kafkaSubscribeOrAssign);
  pollEvent = new PollEvent();
  commitEvent = new CommitEvent();
  recordEmitter = EmitterProcessor.create();
  recordSubmission = recordEmitter.sink();
  scheduler = Schedulers.single(receiverOptions.schedulerSupplier().get());
  consumerFlux = recordEmitter
      .publishOn(scheduler)
      .doOnSubscribe(s -> {
        try {
          start();
        } catch (Exception e) {
          log.error("Subscription to event flux failed", e);
          throw e;
        }
      })
      .doOnRequest(r -> {
        if (requestsPending.get() > 0)
          pollEvent.scheduleIfRequired();
      })
      .doAfterTerminate(this::dispose)
      .doOnCancel(this::dispose);
  return consumerFlux;
}

代码示例来源:origin: io.projectreactor.kafka/reactor-kafka

private Flux<ConsumerRecord<K, V>> transactionalRecords(TransactionManager transactionManager, ConsumerRecords<K, V> records) {
  if (records.isEmpty())
    return Flux.empty();
  CommittableBatch offsetBatch = new CommittableBatch();
  for (ConsumerRecord<K, V> r : records)
    offsetBatch.updateOffset(new TopicPartition(r.topic(), r.partition()), r.offset());
  return Flux.fromIterable(records)
        .concatWith(transactionManager.sendOffsets(offsetBatch.getAndClearOffsets().offsets(), receiverOptions.groupId()))
        .doAfterTerminate(() -> awaitingTransaction.set(false));
}

代码示例来源:origin: reactor/reactor-kafka

private Flux<ConsumerRecord<K, V>> transactionalRecords(TransactionManager transactionManager, ConsumerRecords<K, V> records) {
  if (records.isEmpty())
    return Flux.empty();
  CommittableBatch offsetBatch = new CommittableBatch();
  for (ConsumerRecord<K, V> r : records)
    offsetBatch.updateOffset(new TopicPartition(r.topic(), r.partition()), r.offset());
  return Flux.fromIterable(records)
        .concatWith(transactionManager.sendOffsets(offsetBatch.getAndClearOffsets().offsets(), receiverOptions.groupId()))
        .doAfterTerminate(() -> awaitingTransaction.set(false));
}

代码示例来源:origin: reactor/reactor-kafka

private synchronized Flux<ConsumerRecords<K, V>> createConsumerFlux() {
  if (consumerFlux != null)
    throw new IllegalStateException("Multiple subscribers are not supported for KafkaReceiver flux");
  Consumer<Flux<?>> kafkaSubscribeOrAssign = (flux) -> receiverOptions.subscriber(this).accept(consumer);
  initEvent = new InitEvent(kafkaSubscribeOrAssign);
  pollEvent = new PollEvent();
  commitEvent = new CommitEvent();
  recordEmitter = EmitterProcessor.create();
  recordSubmission = recordEmitter.sink();
  scheduler = Schedulers.single(receiverOptions.schedulerSupplier().get());
  consumerFlux = recordEmitter
      .publishOn(scheduler)
      .doOnSubscribe(s -> {
        try {
          start();
        } catch (Exception e) {
          log.error("Subscription to event flux failed", e);
          throw e;
        }
      })
      .doOnRequest(r -> {
        if (requestsPending.get() > 0)
          pollEvent.scheduleIfRequired();
      })
      .doAfterTerminate(this::dispose)
      .doOnCancel(this::dispose);
  return consumerFlux;
}

代码示例来源:origin: reactor/reactor-netty

.doAfterTerminate(encoder);

代码示例来源:origin: io.projectreactor.netty/reactor-netty

.doAfterTerminate(encoder);

代码示例来源:origin: io.projectreactor.ipc/reactor-netty

.doAfterTerminate(encoder);

相关文章

Flux类方法