reactor.core.publisher.Flux.publish()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(7.9k)|赞(0)|评价(0)|浏览(301)

本文整理了Java中reactor.core.publisher.Flux.publish()方法的一些代码示例,展示了Flux.publish()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.publish()方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:publish

Flux.publish介绍

[英]Prepare a ConnectableFlux which shares this Flux sequence and dispatches values to subscribers in a backpressure-aware manner. Prefetch will default to Queues#SMALL_BUFFER_SIZE. This will effectively turn any type of sequence into a hot sequence.

Backpressure will be coordinated on Subscription#request and if any Subscriber is missing demand (requested = 0), multicast will pause pushing/pulling.
[中]准备一个可连接的通量,该通量共享该通量序列,并以背压感知的方式向用户发送值。预取将默认为队列#小_缓冲区_大小。这将有效地将任何类型的序列转换为热序列。
将根据订阅请求协调背压,如果任何订阅方缺少请求(请求=0),多播将暂停推/拉。

代码示例

代码示例来源:origin: reactor/reactor-core

@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
  return Arrays.asList(
      scenario(f -> f.publish().autoConnect()),
      scenario(f -> f.publish().refCount())
  );
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = IllegalArgumentException.class)
public void failPrefetch(){
  Flux.never()
    .publish(f -> f, -1);
}

代码示例来源:origin: reactor/reactor-core

@Test(expected = IllegalArgumentException.class)
public void failPrefetch(){
  Flux.never()
    .publish( -1);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void advancedConnectable() throws InterruptedException {
  Flux<Integer> source = Flux.range(1, 3)
                .doOnSubscribe(s -> System.out.println("subscribed to source"));
  ConnectableFlux<Integer> co = source.publish();
  co.subscribe(System.out::println, e -> {}, () -> {});
  co.subscribe(System.out::println, e -> {}, () -> {});
  System.out.println("done subscribing");
  Thread.sleep(500);
  System.out.println("will now connect");
  co.connect();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void advancedConnectableAutoConnect() throws InterruptedException {
  Flux<Integer> source = Flux.range(1, 3)
                .doOnSubscribe(s -> System.out.println("subscribed to source"));
  Flux<Integer> autoCo = source.publish().autoConnect(2);
  autoCo.subscribe(System.out::println, e -> {}, () -> {});
  System.out.println("subscribed first");
  Thread.sleep(500);
  System.out.println("subscribing second");
  autoCo.subscribe(System.out::println, e -> {}, () -> {});
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanMain() {
  ConnectableFlux<Integer> parent = Flux.just(10).publish();
  FluxRefCount<Integer> test = new FluxRefCount<>(parent, 17);
  assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
  assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(256);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanMain() throws Exception {
  ConnectableFlux<String> source = Flux.just("foo").publish();
  AssemblySnapshot stacktrace = new AssemblySnapshot(null, Traces.callSiteSupplierFactory.get());
  ConnectableFluxOnAssembly<String> test = new ConnectableFluxOnAssembly<>(source, stacktrace);
  assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(-1);
  assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(source);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncPollCompleteCalled() {
  AtomicBoolean onComplete = new AtomicBoolean();
  ConnectableFlux<Integer> f = Flux.just(1)
                  .doOnComplete(() -> onComplete.set(true))
                  .publish();
  StepVerifier.create(f)
        .then(f::connect)
        .expectNext(1)
        .verifyComplete();
  assertThat(onComplete.get()).withFailMessage("onComplete not called back").isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void error() {
  StepVerifier.create(Flux.error(new IllegalStateException("boom"))
              .publish()
              .refCount(1, Duration.ofMillis(500)))
        .verifyErrorMessage("boom");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanMain() {
  ConnectableFlux<Integer> parent = Flux.just(10).publish();
  FluxRefCountGrace<Integer> test = new FluxRefCountGrace<Integer>(parent, 17, Duration.ofSeconds(1), Schedulers.single());
  assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
  assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(256);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncPollAfterTerminateCalled() {
  AtomicBoolean onAfterTerminate = new AtomicBoolean();
  ConnectableFlux<Integer> f = Flux.just(1)
                  .doAfterTerminate(() -> onAfterTerminate.set(true))
                  .publish();
  StepVerifier.create(f)
        .then(f::connect)
        .expectNext(1)
        .verifyComplete();
  assertThat(onAfterTerminate.get()).withFailMessage("onAfterTerminate not called back").isTrue();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void stepNameAndToString() {
  AssemblySnapshot stacktrace = new AssemblySnapshot(null, Traces.callSiteSupplierFactory.get());
  ConnectableFluxOnAssembly<?> test = new ConnectableFluxOnAssembly<>(Flux.empty().publish(), stacktrace);
  assertThat(test.toString())
      .isEqualTo(test.stepName())
      .startsWith("reactor.core.publisher.ConnectableFluxOnAssemblyTest.stepNameAndToString(ConnectableFluxOnAssemblyTest.java:");
}

代码示例来源:origin: reactor/reactor-core

@Test
public void scanOperator() {
  ConnectableFlux<String> source = Flux.just("foo").publish();
  AssemblySnapshot stacktrace = new AssemblySnapshot(null, Traces.callSiteSupplierFactory.get());
  ConnectableFluxOnAssembly<String> test = new ConnectableFluxOnAssembly<>(source, stacktrace);
  assertThat(test.scan(Scannable.Attr.ACTUAL_METADATA)).as("ACTUAL_METADATA").isTrue();
  assertThat(test.scan(Scannable.Attr.PREFETCH)).as("PREFETCH").isEqualTo(-1);
  assertThat(test.scan(Scannable.Attr.PARENT)).as("PARENT").isSameAs(source);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void syncFusionSingle() { //single value in the SYNC fusion
  final ConnectableFlux<String> publish = Flux.just("foo")
                 .publish();
  StepVerifier.create(publish)
        .then(publish::connect)
        .expectNext("foo")
        .expectComplete()
        .verify(Duration.ofSeconds(4));
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void syncFusionMultiple() { //multiple values in the SYNC fusion
    final ConnectableFlux<Integer> publish = Flux.range(1, 5)
                           .publish();

    StepVerifier.create(publish)
          .then(publish::connect)
          .expectNext(1, 2, 3, 4, 5)
          .expectComplete()
          .verify(Duration.ofSeconds(4));
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void fusedMapInvalid() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  ConnectableFlux<Integer> p = Flux.range(1, 5).map(v -> (Integer)null).publish();
  p.subscribe(ts);
  p.connect();
  ts.assertNoValues()
  .assertError(NullPointerException.class)
  .assertNotComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
  public void scanInner() {
    CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, sub -> sub.request(100));
    FluxRefCount<Integer> main = new FluxRefCount<Integer>(Flux.just(10).publish(), 17);
    FluxRefCount.RefCountInner<Integer> test = new FluxRefCount.RefCountInner<Integer>(actual, new FluxRefCount.RefCountMonitor<>(main));
    Subscription sub = Operators.emptySubscription();
    test.onSubscribe(sub);

    assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(sub);
    assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
  }
}

代码示例来源:origin: reactor/reactor-core

@Test
public void subsequentSum() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  range(1, 5).publish(o -> zip((Object[] a) -> (Integer) a[0] + (Integer) a[1], o, o.skip(1)))
        .subscribe(ts);
  ts.assertValues(1 + 2, 2 + 3, 3 + 4, 4 + 5)
   .assertNoError()
   .assertComplete();
}

代码示例来源:origin: reactor/reactor-core

@Test
public void upstreamCompletesTwoSubscribers() {
  Flux<Integer> p = Flux.range(1, 5).publish().refCount(2);
  AssertSubscriber<Integer> ts1 = AssertSubscriber.create();
  p.subscribe(ts1);
  ts1.assertValueCount(0);
  AssertSubscriber<Integer> ts2 = AssertSubscriber.create();
  p.subscribe(ts2);
  ts1.assertValues(1, 2, 3, 4, 5);
  ts2.assertValues(1, 2, 3, 4, 5);
}

代码示例来源:origin: reactor/reactor-core

@Test
public void subsequentSumHidden() {
  AssertSubscriber<Integer> ts = AssertSubscriber.create();
  range(1, 5).hide()
        .publish(o -> zip((Object[] a) -> (Integer) a[0] + (Integer) a[1], o, o
            .skip(1)))
        .subscribe(ts);
  ts.assertValues(1 + 2, 2 + 3, 3 + 4, 4 + 5)
   .assertNoError()
   .assertComplete();
}

相关文章

Flux类方法