本文整理了Java中io.reactivex.Observable.publish()
方法的一些代码示例,展示了Observable.publish()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.publish()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:publish
[英]Returns a ConnectableObservable, which is a variety of ObservableSource that waits until its ConnectableObservable#connect method is called before it begins emitting items to those Observers that have subscribed to it.
Scheduler: publish does not operate by default on a particular Scheduler.
[中]返回ConnectableObservable,它是一种ObservableSource,在开始向订阅它的观察者发送项目之前,它会等待调用其ConnectableObservable#connect方法。
计划程序:默认情况下,发布不会在特定计划程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void publishFunctionNull() {
just1.publish(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<Object> apply(final Observable<Object> o)
throws Exception {
return Observable.<Integer>never().publish(new Function<Observable<Integer>, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Observable<Integer> v)
throws Exception {
return o;
}
});
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testAlreadyUnsubscribedClient() {
Observer<Integer> done = DisposingObserver.INSTANCE;
Observer<Integer> o = TestHelper.mockObserver();
Observable<Integer> result = Observable.just(1).publish().refCount();
result.subscribe(done);
result.subscribe(o);
verify(o).onNext(1);
verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void publishFunctionReturnsNull() {
just1.publish(new Function<Observable<Integer>, Observable<Object>>() {
@Override
public Observable<Object> apply(Observable<Integer> v) {
return null;
}
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void selectorCrash() {
Observable.just(1).publish(new Function<Observable<Integer>, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Observable<Integer> v) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void disposed() {
TestHelper.checkDisposed(Observable.just(1).publish().refCount());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void empty() {
ConnectableObservable<Integer> co = Observable.<Integer>empty().publish();
co.connect();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.never().publish());
TestHelper.checkDisposed(Observable.never().publish(Functions.<Observable<Object>>identity()));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void connectRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final ConnectableObservable<Integer> co = Observable.<Integer>empty().publish();
Runnable r1 = new Runnable() {
@Override
public void run() {
co.connect();
}
};
TestHelper.race(r1, r1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
Observable.<Integer>error(new IOException())
.publish()
.refCount(500, TimeUnit.MILLISECONDS)
.test()
.assertFailure(IOException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void source() {
Observable<Integer> o = Observable.never();
assertSame(o, (((HasUpstreamObservableSource<?>)o.publish()).source()));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 5000)
public void selectorLatecommer() {
Observable.range(1, 5)
.publish(new Function<Observable<Integer>, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Observable<Integer> v) throws Exception {
return v.concatWith(v);
}
})
.test()
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void disposeOnArrival() {
ConnectableObservable<Integer> co = Observable.<Integer>empty().publish();
co.test(true).assertEmpty();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void preNextConnect() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final ConnectableObservable<Integer> co = Observable.<Integer>empty().publish();
co.connect();
Runnable r1 = new Runnable() {
@Override
public void run() {
co.test();
}
};
TestHelper.race(r1, r1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void disposedUpfront() {
ConnectableObservable<Integer> co = Observable.just(1)
.concatWith(Observable.<Integer>never())
.publish();
TestObserver<Integer> to1 = co.test();
TestObserver<Integer> to2 = co.test(true);
co.connect();
to1.assertValuesOnly(1);
to2.assertEmpty();
((ObservablePublish<Integer>)co).current.get().remove(null);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNonNullConnection() {
ConnectableObservable<Object> source = Observable.never().publish();
assertNotNull(source.connect());
assertNotNull(source.connect());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testNoDisconnectSomeoneElse() {
ConnectableObservable<Object> source = Observable.never().publish();
Disposable connection1 = source.connect();
Disposable connection2 = source.connect();
connection1.dispose();
Disposable connection3 = source.connect();
connection2.dispose();
assertTrue(checkPublishDisposed(connection1));
assertTrue(checkPublishDisposed(connection2));
assertFalse(checkPublishDisposed(connection3));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void take() {
ConnectableObservable<Integer> co = Observable.range(1, 2).publish();
TestObserver<Integer> to = co.take(1).test();
co.connect();
to.assertResult(1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void mainError() {
Observable.error(new TestException())
.publish(Functions.<Observable<Object>>identity())
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void noErrorLoss() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
ConnectableObservable<Object> co = Observable.error(new TestException()).publish();
co.connect();
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
内容来源于网络,如有侵权,请联系作者删除!