本文整理了Java中io.reactivex.Observable.takeUntil()
方法的一些代码示例,展示了Observable.takeUntil()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.takeUntil()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:takeUntil
[英]Returns an Observable that emits the items emitted by the source Observable until a second ObservableSource emits an item.
Scheduler: takeUntil does not operate by default on a particular Scheduler.
[中]返回一个Observable,该Observable发出源Observable发出的项,直到第二个ObservableSource发出项为止。
调度程序:默认情况下,TakeTill不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Observable<Integer> o) throws Exception {
return o.takeUntil(Observable.never());
}
});
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Observable<Integer> xs) {
return xs.takeUntil(xs.skipWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer i) {
return i <= 3;
}
}));
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
return o.takeUntil(Functions.alwaysFalse());
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void takeUntilPredicateNull() {
just1.takeUntil((Predicate<Integer>)null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void takeUntilObservableNull() {
just1.takeUntil((Observable<Integer>)null);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns an Observable that emits those items emitted by source ObservableSource before a specified time runs
* out.
* <p>
* If time runs out before the {@code Observable} completes normally, the {@code onComplete} event will be
* signaled on the default {@code computation} {@link Scheduler}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/take.t.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code take} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param time
* the length of the time window
* @param unit
* the time unit of {@code time}
* @return an Observable that emits those items emitted by the source ObservableSource before the time runs out
* @see <a href="http://reactivex.io/documentation/operators/take.html">ReactiveX operators documentation: Take</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> take(long time, TimeUnit unit) {
return takeUntil(timer(time, unit));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void takeAll() {
Observer<Object> o = TestHelper.mockObserver();
Observable.just(1, 2).takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer v) {
return false;
}
}).subscribe(o);
verify(o).onNext(1);
verify(o).onNext(2);
verify(o, never()).onError(any(Throwable.class));
verify(o).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void takeEmpty() {
Observer<Object> o = TestHelper.mockObserver();
Observable.empty().takeUntil(new Predicate<Object>() {
@Override
public boolean test(Object v) {
return true;
}
}).subscribe(o);
verify(o, never()).onNext(any());
verify(o, never()).onError(any(Throwable.class));
verify(o).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void takeFirst() {
Observer<Object> o = TestHelper.mockObserver();
Observable.just(1, 2).takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer v) {
return true;
}
}).subscribe(o);
verify(o).onNext(1);
verify(o, never()).onNext(2);
verify(o, never()).onError(any(Throwable.class));
verify(o).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void functionThrows() {
Observer<Object> o = TestHelper.mockObserver();
Predicate<Integer> predicate = (new Predicate<Integer>() {
@Override
public boolean test(Integer t1) {
throw new TestException("Forced failure");
}
});
Observable.just(1, 2, 3).takeUntil(predicate).subscribe(o);
verify(o).onNext(1);
verify(o, never()).onNext(2);
verify(o, never()).onNext(3);
verify(o).onError(any(TestException.class));
verify(o, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void takeSome() {
Observer<Object> o = TestHelper.mockObserver();
Observable.just(1, 2, 3).takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer t1) {
return t1 == 2;
}
})
.subscribe(o);
verify(o).onNext(1);
verify(o).onNext(2);
verify(o, never()).onNext(3);
verify(o, never()).onError(any(Throwable.class));
verify(o).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTakeUntilSourceCompleted() {
Disposable sSource = mock(Disposable.class);
Disposable sOther = mock(Disposable.class);
TestObservable source = new TestObservable(sSource);
TestObservable other = new TestObservable(sOther);
Observer<String> result = TestHelper.mockObserver();
Observable<String> stringObservable = Observable.unsafeCreate(source).takeUntil(Observable.unsafeCreate(other));
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
source.sendOnCompleted();
verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(sSource, never()).dispose(); // no longer disposing itself on terminal events
verify(sOther, times(1)).dispose();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTakeUntilSourceError() {
Disposable sSource = mock(Disposable.class);
Disposable sOther = mock(Disposable.class);
TestObservable source = new TestObservable(sSource);
TestObservable other = new TestObservable(sOther);
Throwable error = new Throwable();
Observer<String> result = TestHelper.mockObserver();
Observable<String> stringObservable = Observable.unsafeCreate(source).takeUntil(Observable.unsafeCreate(other));
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
source.sendOnError(error);
source.sendOnNext("three");
verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(result, times(0)).onNext("three");
verify(result, times(1)).onError(error);
verify(sSource, never()).dispose(); // no longer disposing itself on terminal events
verify(sOther, times(1)).dispose();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void sourceThrows() {
Observer<Object> o = TestHelper.mockObserver();
Observable.just(1)
.concatWith(Observable.<Integer>error(new TestException()))
.concatWith(Observable.just(2))
.takeUntil(new Predicate<Integer>() {
@Override
public boolean test(Integer v) {
return false;
}
}).subscribe(o);
verify(o).onNext(1);
verify(o, never()).onNext(2);
verify(o).onError(any(TestException.class));
verify(o, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTakeUntilOtherError() {
Disposable sSource = mock(Disposable.class);
Disposable sOther = mock(Disposable.class);
TestObservable source = new TestObservable(sSource);
TestObservable other = new TestObservable(sOther);
Throwable error = new Throwable();
Observer<String> result = TestHelper.mockObserver();
Observable<String> stringObservable = Observable.unsafeCreate(source).takeUntil(Observable.unsafeCreate(other));
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
other.sendOnError(error);
source.sendOnNext("three");
verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(result, times(0)).onNext("three");
verify(result, times(1)).onError(error);
verify(result, times(0)).onComplete();
verify(sSource, times(1)).dispose();
verify(sOther, never()).dispose(); // no longer disposing itself on termination
}
代码示例来源:origin: ReactiveX/RxJava
/**
* If the 'other' onCompletes then we unsubscribe from the source and onComplete.
*/
@Test
public void testTakeUntilOtherCompleted() {
Disposable sSource = mock(Disposable.class);
Disposable sOther = mock(Disposable.class);
TestObservable source = new TestObservable(sSource);
TestObservable other = new TestObservable(sOther);
Observer<String> result = TestHelper.mockObserver();
Observable<String> stringObservable = Observable.unsafeCreate(source).takeUntil(Observable.unsafeCreate(other));
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
other.sendOnCompleted();
source.sendOnNext("three");
verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(result, times(0)).onNext("three");
verify(result, times(1)).onComplete();
verify(sSource, times(1)).dispose();
verify(sOther, never()).dispose(); // no longer disposing itself on terminal events
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTakeUntil() {
Disposable sSource = mock(Disposable.class);
Disposable sOther = mock(Disposable.class);
TestObservable source = new TestObservable(sSource);
TestObservable other = new TestObservable(sOther);
Observer<String> result = TestHelper.mockObserver();
Observable<String> stringObservable = Observable.unsafeCreate(source)
.takeUntil(Observable.unsafeCreate(other));
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
other.sendOnNext("three");
source.sendOnNext("four");
source.sendOnCompleted();
other.sendOnCompleted();
verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(result, times(0)).onNext("three");
verify(result, times(0)).onNext("four");
verify(sSource, times(1)).dispose();
verify(sOther, times(1)).dispose();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testErrorIncludesLastValueAsCause() {
TestObserver<String> to = new TestObserver<String>();
final TestException e = new TestException("Forced failure");
Predicate<String> predicate = (new Predicate<String>() {
@Override
public boolean test(String t) {
throw e;
}
});
Observable.just("abc").takeUntil(predicate).subscribe(to);
to.assertTerminated();
to.assertNotComplete();
to.assertError(TestException.class);
// FIXME last cause value is not saved
// assertTrue(ts.errors().get(0).getCause().getMessage().contains("abc"));
}
代码示例来源:origin: ReactiveX/RxJava
@Test /* (timeout = 8000) */
public void testSingleSourceManyIterators() throws InterruptedException {
Observable<Long> o = Observable.interval(250, TimeUnit.MILLISECONDS);
PublishSubject<Integer> terminal = PublishSubject.create();
Observable<Long> source = o.takeUntil(terminal);
Iterable<Long> iter = source.blockingNext();
for (int j = 0; j < 3; j++) {
BlockingObservableNext.NextIterator<Long> it = (BlockingObservableNext.NextIterator<Long>)iter.iterator();
for (long i = 0; i < 10; i++) {
Assert.assertEquals(true, it.hasNext());
Assert.assertEquals(j + "th iteration next", Long.valueOf(i), it.next());
}
terminal.onNext(1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void disconnectBeforeConnect() {
BehaviorSubject<Integer> subject = BehaviorSubject.create();
Observable<Integer> observable = subject
.replay(1)
.refCount();
observable.takeUntil(Observable.just(1)).test();
subject.onNext(2);
observable.take(1).test().assertResult(2);
}
}
内容来源于网络,如有侵权,请联系作者删除!