本文整理了Java中io.reactivex.Observable
类的一些代码示例,展示了Observable
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable
类的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
[英]The Observable class is the non-backpressured, optionally multi-valued base reactive class that offers factory methods, intermediate operators and the ability to consume synchronous and/or asynchronous reactive dataflows.
Many operators in the class accept ObservableSource(s), the base reactive interface for such non-backpressured flows, which Observable itself implements as well.
The Observable's operators, by default, run with a buffer size of 128 elements (see Flowable#bufferSize(), that can be overridden globally via the system parameter rx2.buffer-size. Most operators, however, have overloads that allow setting their internal buffer size explicitly.
The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:
The design of this class was derived from the Reactive-Streams design and specification by removing any backpressure-related infrastructure and implementation detail, replacing the org.reactivestreams.Subscription with Disposable as the primary means to cancel a flow.
The Observable follows the protocol
onSubscribe onNext* (onError | onComplete)?
where the stream can be disposed through the Disposable instance provided to consumers through Observer.onSubscribe.
Unlike the Observable of version 1.x, #subscribe(Observer) does not allow external cancellation of a subscription and the Observer instance is expected to expose such capability.
Example:
Disposable d = Observable.just("Hello world!")
.delay(1, TimeUnit.SECONDS)
.subscribeWith(new DisposableObserver<String>() {
@Override public void onStart() {
System.out.println("Start!");
}
@Override public void onNext(Integer t) {
System.out.println(t);
}
@Override public void onError(Throwable t) {
t.printStackTrace();
}
@Override public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(500);
// the sequence now can be cancelled via dispose()
d.dispose();
[中]Observable类是非背压的、可选的多值基本反应类,它提供工厂方法、中间运算符以及使用同步和/或异步反应数据流的能力。
该类中的许多运算符都接受Observable Source,这是此类非背压流的基本反应接口,Observable本身也实现了这一接口。
默认情况下,Observable的运算符以128个元素的缓冲区大小运行(请参见Flowable#bufferSize(),可通过系统参数rx2全局覆盖)。缓冲区大小。然而,大多数操作符都有重载,允许显式设置其内部缓冲区大小。
这个类的文档使用大理石图表。以下图例解释了这些图表:
这个类的设计源于Reactive-Streams design and specification,它删除了任何与背压相关的基础设施和实现细节,替换了org。反应流。使用一次性订阅作为取消流的主要手段。
可观测的遵循协议
onSubscribe onNext* (onError | onComplete)?
其中流可以通过Observer提供给消费者的一次性实例进行处理。订阅。
与版本1的可观察性不同。x、 #subscribe(Observer)不允许从外部取消订阅,而Observer实例将公开此类功能。
例子:
Disposable d = Observable.just("Hello world!")
.delay(1, TimeUnit.SECONDS)
.subscribeWith(new DisposableObserver<String>() {
@Override public void onStart() {
System.out.println("Start!");
}
@Override public void onNext(Integer t) {
System.out.println(t);
}
@Override public void onError(Throwable t) {
t.printStackTrace();
}
@Override public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(500);
// the sequence now can be cancelled via dispose()
d.dispose();
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Integer> apply(Integer t) {
Observable<Integer> o = Observable.just(t)
.subscribeOn(sch)
;
Subject<Integer> subject = UnicastSubject.create();
o.subscribe(subject);
return subject;
}
};
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = IllegalArgumentException.class)
public void testMapWithIssue417() {
Observable.just(1).observeOn(Schedulers.computation())
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer arg0) {
throw new IllegalArgumentException("any error");
}
}).blockingSingle();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Observable<Integer> o) throws Exception {
return Observable.just(1).window(o).flatMap(new Function<Observable<Integer>, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Observable<Integer> v) throws Exception {
return v;
}
});
}
}, false, 1, 1, (Object[])null);
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<String> apply(final String s) {
return Observable.just(s)
.mergeWith(Observable.interval(10, TimeUnit.MILLISECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long i) {
return s + " " + i;
}
})).take(250);
}
})
代码示例来源:origin: ReactiveX/RxJava
@Override
public Object apply(Observable<Object> o) throws Exception {
return o.window(Observable.never()).flatMap(new Function<Observable<Object>, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Observable<Object> v) throws Exception {
return v;
}
});
}
}, false, 1, 1, 1);
代码示例来源:origin: ReactiveX/RxJava
@Override
public ObservableSource<Object> apply(Integer v) throws Exception {
return Observable.range(1, 2).map(new Function<Integer, Object>() {
@Override
public Object apply(Integer w) throws Exception {
throw new TestException();
}
});
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test
public void innerEscapeCompleted() {
Observable<Integer> source = Observable.just(0);
Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
TestObserver<Object> to = new TestObserver<Object>();
m.subscribe(to);
to.awaitTerminalEvent();
to.assertNoErrors();
System.out.println(to.values());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testBackpressure1() {
TestObserver<Integer> to = new TestObserver<Integer>();
Observable.range(1, 100000).takeLast(1)
.observeOn(Schedulers.newThread())
.map(newSlowProcessor()).subscribe(to);
to.awaitTerminalEvent();
to.assertNoErrors();
to.assertValue(100000);
}
代码示例来源:origin: ReactiveX/RxJava
@Ignore("Not sure what this does")
@Test
public void addOnNextValueExceptionAdded() throws Exception {
Observer<BadToString> observer = new BadToStringObserver();
Observable.just(new BadToString(false))
.map(new Function<BadToString, BadToString>() {
@Override
public BadToString apply(BadToString badToString) {
throw new IllegalArgumentException("Failure while handling");
}
}).subscribe(observer);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTake2() {
Observable<Integer> o = Observable.just(1, 2, 3, 4, 5);
Iterable<String> it = Arrays.asList("a", "b", "c", "d", "e");
SquareStr squareStr = new SquareStr();
o.map(squareStr).zipWith(it, concat2Strings).take(2).subscribe(printer);
assertEquals(2, squareStr.counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTakeFirstWithPredicateOfNoneMatchingThePredicate() {
Observable<Integer> o = Observable.just(1, 3, 5, 7, 9, 7, 5, 3, 1);
o.filter(IS_EVEN).take(1).subscribe(w);
verify(w, never()).onNext(anyInt());
verify(w, times(1)).onComplete();
verify(w, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void workerNotDisposedPrematurelyNormalInAsyncOut() {
DisposeTrackingScheduler s = new DisposeTrackingScheduler();
TestObserver<Integer> to = new TestObserverFusedCanceling();
Observable.just(1).hide().observeOn(s).subscribe(to);
assertEquals(1, s.disposedCount.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFirstOrElseWithPredicateOfNoneMatchingThePredicateObservable() {
Observable<String> src = Observable.just("a", "b", "c");
src.filter(IS_D).first("default").toObservable().subscribe(w);
verify(w, times(1)).onNext(anyString());
verify(w, times(1)).onNext("default");
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nonFusedConditional() {
TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.SYNC);
Observable.range(1, 5).hide()
.doFinally(this)
.filter(Functions.alwaysTrue())
.subscribe(to);
ObserverFusion.assertFusion(to, QueueFuseable.NONE)
.assertResult(1, 2, 3, 4, 5);
assertEquals(1, calls);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testError2() {
Observable<Integer> source = Observable.concat(Observable.just(0),
Observable.<Integer> error(new TestException("Forced failure")));
Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
TestObserver<Object> to = new TestObserver<Object>();
m.subscribe(to);
to.awaitTerminalEvent();
assertEquals(1, to.errorCount());
to.assertValueCount(1);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* This is testing a no-op path since it uses Schedulers.immediate() which will not do scheduling.
*/
@Test
public void testObserveOn() {
Observer<Integer> observer = TestHelper.mockObserver();
Observable.just(1, 2, 3).observeOn(ImmediateThinScheduler.INSTANCE).subscribe(observer);
verify(observer, times(1)).onNext(1);
verify(observer, times(1)).onNext(2);
verify(observer, times(1)).onNext(3);
verify(observer, times(1)).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFlatMapTransformsOnErrorFuncThrows() {
Observable<Integer> onNext = Observable.fromIterable(Arrays.asList(1, 2, 3));
Observable<Integer> onComplete = Observable.fromIterable(Arrays.asList(4));
Observable<Integer> onError = Observable.fromIterable(Arrays.asList(5));
Observable<Integer> source = Observable.error(new TestException());
Observer<Object> o = TestHelper.mockObserver();
source.flatMap(just(onNext), funcThrow((Throwable) null, onError), just0(onComplete)).subscribe(o);
verify(o).onError(any(CompositeException.class));
verify(o, never()).onNext(any());
verify(o, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void outputFused() {
TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
Observable.range(1, 5).hide()
.observeOn(Schedulers.single())
.subscribe(to);
ObserverFusion.assertFusion(to, QueueFuseable.ASYNC)
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1, 2, 3, 4, 5);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFlatMapTransformsOnCompletedFuncThrows() {
Observable<Integer> onNext = Observable.fromIterable(Arrays.asList(1, 2, 3));
Observable<Integer> onComplete = Observable.fromIterable(Arrays.asList(4));
Observable<Integer> onError = Observable.fromIterable(Arrays.asList(5));
Observable<Integer> source = Observable.fromIterable(Arrays.<Integer> asList());
Observer<Object> o = TestHelper.mockObserver();
source.flatMap(just(onNext), just(onError), funcThrow0(onComplete)).subscribe(o);
verify(o).onError(any(TestException.class));
verify(o, never()).onNext(any());
verify(o, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testLongTimeAction() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
LongTimeAction action = new LongTimeAction(latch);
Observable.just(1).buffer(10, TimeUnit.MILLISECONDS, 10)
.subscribe(action);
latch.await();
assertFalse(action.fail);
}
内容来源于网络,如有侵权,请联系作者删除!