本文整理了Java中reactor.core.publisher.Flux.publish()
方法的一些代码示例,展示了Flux.publish()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flux.publish()
方法的具体详情如下:
包路径:reactor.core.publisher.Flux
类名称:Flux
方法名:publish
[英]Prepare a ConnectableFlux which shares this Flux sequence and dispatches values to subscribers in a backpressure-aware manner. Prefetch will default to Queues#SMALL_BUFFER_SIZE. This will effectively turn any type of sequence into a hot sequence.
Backpressure will be coordinated on Subscription#request and if any Subscriber is missing demand (requested = 0), multicast will pause pushing/pulling.
[中]准备一个可连接的通量,该通量共享该通量序列,并以背压感知的方式向用户发送值。预取将默认为队列#小_缓冲区_大小。这将有效地将任何类型的序列转换为热序列。
将根据订阅请求协调背压,如果任何订阅方缺少请求(请求=0),多播将暂停推/拉。
代码示例来源:origin: reactor/reactor-core
@Override
protected List<Scenario<String, String>> scenarios_operatorSuccess() {
return Arrays.asList(
scenario(f -> f.publish().autoConnect()),
scenario(f -> f.publish().refCount())
);
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = IllegalArgumentException.class)
public void failPrefetch(){
Flux.never()
.publish(f -> f, -1);
}
代码示例来源:origin: reactor/reactor-core
@Test(expected = IllegalArgumentException.class)
public void failPrefetch(){
Flux.never()
.publish( -1);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void advancedConnectable() throws InterruptedException {
Flux<Integer> source = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("subscribed to source"));
ConnectableFlux<Integer> co = source.publish();
co.subscribe(System.out::println, e -> {}, () -> {});
co.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("done subscribing");
Thread.sleep(500);
System.out.println("will now connect");
co.connect();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void advancedConnectableAutoConnect() throws InterruptedException {
Flux<Integer> source = Flux.range(1, 3)
.doOnSubscribe(s -> System.out.println("subscribed to source"));
Flux<Integer> autoCo = source.publish().autoConnect(2);
autoCo.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("subscribed first");
Thread.sleep(500);
System.out.println("subscribing second");
autoCo.subscribe(System.out::println, e -> {}, () -> {});
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanMain() {
ConnectableFlux<Integer> parent = Flux.just(10).publish();
FluxRefCount<Integer> test = new FluxRefCount<>(parent, 17);
assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(256);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanMain() throws Exception {
ConnectableFlux<String> source = Flux.just("foo").publish();
AssemblySnapshot stacktrace = new AssemblySnapshot(null, Traces.callSiteSupplierFactory.get());
ConnectableFluxOnAssembly<String> test = new ConnectableFluxOnAssembly<>(source, stacktrace);
assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(-1);
assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(source);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncPollCompleteCalled() {
AtomicBoolean onComplete = new AtomicBoolean();
ConnectableFlux<Integer> f = Flux.just(1)
.doOnComplete(() -> onComplete.set(true))
.publish();
StepVerifier.create(f)
.then(f::connect)
.expectNext(1)
.verifyComplete();
assertThat(onComplete.get()).withFailMessage("onComplete not called back").isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void error() {
StepVerifier.create(Flux.error(new IllegalStateException("boom"))
.publish()
.refCount(1, Duration.ofMillis(500)))
.verifyErrorMessage("boom");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanMain() {
ConnectableFlux<Integer> parent = Flux.just(10).publish();
FluxRefCountGrace<Integer> test = new FluxRefCountGrace<Integer>(parent, 17, Duration.ofSeconds(1), Schedulers.single());
assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent);
assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(256);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncPollAfterTerminateCalled() {
AtomicBoolean onAfterTerminate = new AtomicBoolean();
ConnectableFlux<Integer> f = Flux.just(1)
.doAfterTerminate(() -> onAfterTerminate.set(true))
.publish();
StepVerifier.create(f)
.then(f::connect)
.expectNext(1)
.verifyComplete();
assertThat(onAfterTerminate.get()).withFailMessage("onAfterTerminate not called back").isTrue();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void stepNameAndToString() {
AssemblySnapshot stacktrace = new AssemblySnapshot(null, Traces.callSiteSupplierFactory.get());
ConnectableFluxOnAssembly<?> test = new ConnectableFluxOnAssembly<>(Flux.empty().publish(), stacktrace);
assertThat(test.toString())
.isEqualTo(test.stepName())
.startsWith("reactor.core.publisher.ConnectableFluxOnAssemblyTest.stepNameAndToString(ConnectableFluxOnAssemblyTest.java:");
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanOperator() {
ConnectableFlux<String> source = Flux.just("foo").publish();
AssemblySnapshot stacktrace = new AssemblySnapshot(null, Traces.callSiteSupplierFactory.get());
ConnectableFluxOnAssembly<String> test = new ConnectableFluxOnAssembly<>(source, stacktrace);
assertThat(test.scan(Scannable.Attr.ACTUAL_METADATA)).as("ACTUAL_METADATA").isTrue();
assertThat(test.scan(Scannable.Attr.PREFETCH)).as("PREFETCH").isEqualTo(-1);
assertThat(test.scan(Scannable.Attr.PARENT)).as("PARENT").isSameAs(source);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncFusionSingle() { //single value in the SYNC fusion
final ConnectableFlux<String> publish = Flux.just("foo")
.publish();
StepVerifier.create(publish)
.then(publish::connect)
.expectNext("foo")
.expectComplete()
.verify(Duration.ofSeconds(4));
}
代码示例来源:origin: reactor/reactor-core
@Test
public void syncFusionMultiple() { //multiple values in the SYNC fusion
final ConnectableFlux<Integer> publish = Flux.range(1, 5)
.publish();
StepVerifier.create(publish)
.then(publish::connect)
.expectNext(1, 2, 3, 4, 5)
.expectComplete()
.verify(Duration.ofSeconds(4));
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void fusedMapInvalid() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
ConnectableFlux<Integer> p = Flux.range(1, 5).map(v -> (Integer)null).publish();
p.subscribe(ts);
p.connect();
ts.assertNoValues()
.assertError(NullPointerException.class)
.assertNotComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void scanInner() {
CoreSubscriber<Integer> actual = new LambdaSubscriber<>(null, e -> {}, null, sub -> sub.request(100));
FluxRefCount<Integer> main = new FluxRefCount<Integer>(Flux.just(10).publish(), 17);
FluxRefCount.RefCountInner<Integer> test = new FluxRefCount.RefCountInner<Integer>(actual, new FluxRefCount.RefCountMonitor<>(main));
Subscription sub = Operators.emptySubscription();
test.onSubscribe(sub);
assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(sub);
assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
}
}
代码示例来源:origin: reactor/reactor-core
@Test
public void subsequentSum() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
range(1, 5).publish(o -> zip((Object[] a) -> (Integer) a[0] + (Integer) a[1], o, o.skip(1)))
.subscribe(ts);
ts.assertValues(1 + 2, 2 + 3, 3 + 4, 4 + 5)
.assertNoError()
.assertComplete();
}
代码示例来源:origin: reactor/reactor-core
@Test
public void upstreamCompletesTwoSubscribers() {
Flux<Integer> p = Flux.range(1, 5).publish().refCount(2);
AssertSubscriber<Integer> ts1 = AssertSubscriber.create();
p.subscribe(ts1);
ts1.assertValueCount(0);
AssertSubscriber<Integer> ts2 = AssertSubscriber.create();
p.subscribe(ts2);
ts1.assertValues(1, 2, 3, 4, 5);
ts2.assertValues(1, 2, 3, 4, 5);
}
代码示例来源:origin: reactor/reactor-core
@Test
public void subsequentSumHidden() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
range(1, 5).hide()
.publish(o -> zip((Object[] a) -> (Integer) a[0] + (Integer) a[1], o, o
.skip(1)))
.subscribe(ts);
ts.assertValues(1 + 2, 2 + 3, 3 + 4, 4 + 5)
.assertNoError()
.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!