本文整理了Java中io.reactivex.Observable.doOnDispose()
方法的一些代码示例,展示了Observable.doOnDispose()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.doOnDispose()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:doOnDispose
[英]Calls the dispose Action if the downstream disposes the sequence.
The action is shared between subscriptions and thus may be called concurrently from multiple threads; the action must be thread safe.
If the action throws a runtime exception, that exception is rethrown by the dispose() call, sometimes as a CompositeException if there were multiple exceptions along the way.
Scheduler: doOnDispose does not operate by default on a particular Scheduler.
[中]如果下游处理序列,则调用dispose操作。
该操作在订阅之间共享,因此可以从多个线程同时调用;操作必须是线程安全的。
如果该操作引发运行时异常,则该异常将由dispose()调用重新引发,如果在此过程中出现多个异常,有时会作为CompositeException。
调度器:默认情况下,doOnDispose不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<Integer> apply(Integer v) throws Exception {
return Observable.just(v).doOnDispose(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void doOnDisposeNull() {
just1.doOnDispose(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribeSource() throws Exception {
Action unsubscribe = mock(Action.class);
Observable<Integer> o = Observable.just(1).doOnDispose(unsubscribe).cache();
o.subscribe();
o.subscribe();
o.subscribe();
verify(unsubscribe, never()).run();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribeSource() throws Exception {
Action unsubscribe = mock(Action.class);
Observable<Integer> o = Observable.just(1).doOnDispose(unsubscribe).replay().autoConnect();
o.subscribe();
o.subscribe();
o.subscribe();
verify(unsubscribe, never()).run();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public void onNext(Integer t) {
if (valueCount() == 2) {
source.doOnDispose(new Action() {
@Override
public void run() {
child2Unsubscribed.set(true);
}
}).take(5).subscribe(to2);
}
super.onNext(t);
}
};
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribesFromUpstream() {
final AtomicBoolean unsubscribed = new AtomicBoolean(false);
Action unsubscribeAction = new Action() {
@Override
public void run() {
unsubscribed.set(true);
}
};
Observable.just(1)
.concatWith(Observable.<Integer>never())
.doOnDispose(unsubscribeAction)
.takeLast(1)
.subscribe()
.dispose();
assertTrue(unsubscribed.get());
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noCancelPreviousIterable() {
final AtomicInteger counter = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnDispose(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
Observable.concat(Arrays.asList(source, source, source, source, source))
.test()
.assertResult(1, 1, 1, 1, 1);
assertEquals(0, counter.get());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void noCancelPreviousRepeat() {
final AtomicInteger counter = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnDispose(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
source.repeat(5)
.test()
.assertResult(1, 1, 1, 1, 1);
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noCancelPreviousArray() {
final AtomicInteger counter = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnDispose(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
Observable.concatArray(source, source, source, source, source)
.test()
.assertResult(1, 1, 1, 1, 1);
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void noCancelPreviousRepeatUntil() {
final AtomicInteger counter = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnDispose(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
final AtomicInteger times = new AtomicInteger();
source.repeatUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
return times.getAndIncrement() == 4;
}
})
.test()
.assertResult(1, 1, 1, 1, 1);
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribesFromUpstreamObservable() {
final AtomicBoolean unsub = new AtomicBoolean();
Observable.range(1, 10).concatWith(Observable.<Integer>never())
.doOnDispose(new Action() {
@Override
public void run() {
unsub.set(true);
}})
.ignoreElements()
.toObservable()
.subscribe()
.dispose();
assertTrue(unsub.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUnsubscribesFromUpstream() {
final AtomicBoolean unsub = new AtomicBoolean();
Observable.range(1, 10).concatWith(Observable.<Integer>never())
.doOnDispose(new Action() {
@Override
public void run() {
unsub.set(true);
}})
.ignoreElements()
.subscribe()
.dispose();
assertTrue(unsub.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void take() throws Exception {
Action onCancel = mock(Action.class);
Observable.range(1, 5)
.doOnDispose(onCancel)
.throttleLatest(1, TimeUnit.MINUTES)
.take(1)
.test()
.assertResult(1);
verify(onCancel).run();
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noCompletionCancelSkip() {
final AtomicInteger counter = new AtomicInteger();
Observable.<Integer>empty()
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
})
.buffer(5, 10, TimeUnit.SECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(Collections.<Integer>emptyList());
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noCompletionCancelOverlap() {
final AtomicInteger counter = new AtomicInteger();
Observable.<Integer>empty()
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
})
.buffer(10, 5, TimeUnit.SECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(Collections.<Integer>emptyList());
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@SuppressWarnings("unchecked")
@Test
public void noCompletionCancelExact() {
final AtomicInteger counter = new AtomicInteger();
Observable.<Integer>empty()
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
})
.buffer(5, TimeUnit.SECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(Collections.<Integer>emptyList());
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void normal() {
final int[] calls = { 0 };
Observable.just(1)
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
calls[0]++;
}
})
.unsubscribeOn(Schedulers.single())
.test()
.assertResult(1);
assertEquals(0, calls[0]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUsingDisposesEagerlyBeforeCompletion() {
final List<String> events = new ArrayList<String>();
Callable<Resource> resourceFactory = createResourceFactory(events);
final Action completion = createOnCompletedAction(events);
final Action unsub = createUnsubAction(events);
Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
@Override
public Observable<String> apply(Resource resource) {
return Observable.fromArray(resource.getTextFromWeb().split(" "));
}
};
Observer<String> observer = TestHelper.mockObserver();
Observable<String> o = Observable.using(resourceFactory, observableFactory,
new DisposeAction(), true)
.doOnDispose(unsub)
.doOnComplete(completion);
o.safeSubscribe(observer);
assertEquals(Arrays.asList("disposed", "completed" /* , "unsub" */), events);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testUsingDoesNotDisposesEagerlyBeforeCompletion() {
final List<String> events = new ArrayList<String>();
Callable<Resource> resourceFactory = createResourceFactory(events);
final Action completion = createOnCompletedAction(events);
final Action unsub = createUnsubAction(events);
Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
@Override
public Observable<String> apply(Resource resource) {
return Observable.fromArray(resource.getTextFromWeb().split(" "));
}
};
Observer<String> observer = TestHelper.mockObserver();
Observable<String> o = Observable.using(resourceFactory, observableFactory,
new DisposeAction(), false)
.doOnDispose(unsub)
.doOnComplete(completion);
o.safeSubscribe(observer);
assertEquals(Arrays.asList("completed", /*"unsub",*/ "disposed"), events);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void error() {
final int[] calls = { 0 };
Observable.error(new TestException())
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
calls[0]++;
}
})
.unsubscribeOn(Schedulers.single())
.test()
.assertFailure(TestException.class);
assertEquals(0, calls[0]);
}
内容来源于网络,如有侵权,请联系作者删除!