本文整理了Java中io.reactivex.Observable.fromFuture()
方法的一些代码示例,展示了Observable.fromFuture()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.fromFuture()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:fromFuture
[英]Converts a Future into an ObservableSource.
You can convert any object that supports the Future interface into an ObservableSource that emits the return value of the Future#get method of that object, by passing the object into the frommethod.
Important note: This ObservableSource is blocking; you cannot dispose it.
Unlike 1.x, disposing the Observable won't cancel the future. If necessary, one can use composition to achieve the cancellation effect: futureObservableSource.doOnDispose(() -> future.cancel(true));. Scheduler: fromFuture does not operate by default on a particular Scheduler.
[中]将未来转化为可观察的资源。
您可以将任何支持Future接口的对象转换为ObservableSource,通过将该对象传递到frommethod,该对象将发出该对象的Future#get方法的返回值。
*重要提示:*此可观察资源处于阻塞状态;你不能处理它。
不像1。x、 处理可观察的事物不会抵消未来。如有必要,人们可以使用合成来实现消除效果:futureObservableSource。doOnDispose(()->未来。取消(对);。调度器:默认情况下,fromFuture不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromFutureSchedulerNull() {
FutureTask<Object> f = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
Observable.fromFuture(f, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromFutureTimedUnitNull() {
Observable.fromFuture(new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null), 1, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromFutureTimedFutureNull() {
Observable.fromFuture(null, 1, TimeUnit.SECONDS);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromFutureNull() {
Observable.fromFuture(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromFutureTimedSchedulerNull() {
Observable.fromFuture(new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null), 1, TimeUnit.SECONDS, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void fromFutureTimedReturnsNull() {
FutureTask<Object> f = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
f.run();
Observable.fromFuture(f, 1, TimeUnit.SECONDS).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSuccess() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Observer<Object> o = TestHelper.mockObserver();
TestObserver<Object> to = new TestObserver<Object>(o);
Observable.fromFuture(future).subscribe(to);
to.dispose();
verify(o, times(1)).onNext(value);
verify(o, times(1)).onComplete();
verify(o, never()).onError(any(Throwable.class));
verify(future, never()).cancel(true);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFailure() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
RuntimeException e = new RuntimeException();
when(future.get()).thenThrow(e);
Observer<Object> o = TestHelper.mockObserver();
TestObserver<Object> to = new TestObserver<Object>(o);
Observable.fromFuture(future).subscribe(to);
to.dispose();
verify(o, never()).onNext(null);
verify(o, never()).onComplete();
verify(o, times(1)).onError(e);
verify(future, never()).cancel(true);
}
代码示例来源:origin: ReactiveX/RxJava
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
Observable<T> o = fromFuture(future);
return o.subscribeOn(scheduler);
代码示例来源:origin: ReactiveX/RxJava
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
Observable<T> o = fromFuture(future, timeout, unit);
return o.subscribeOn(scheduler);
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testSuccessOperatesOnSuppliedScheduler() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Observer<Object> o = TestHelper.mockObserver();
TestScheduler scheduler = new TestScheduler();
TestObserver<Object> to = new TestObserver<Object>(o);
Observable.fromFuture(future, scheduler).subscribe(to);
verify(o, never()).onNext(value);
scheduler.triggerActions();
verify(o, times(1)).onNext(value);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCancelledBeforeSubscribe() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
CancellationException e = new CancellationException("unit test synthetic cancellation");
when(future.get()).thenThrow(e);
Observer<Object> o = TestHelper.mockObserver();
TestObserver<Object> to = new TestObserver<Object>(o);
to.dispose();
Observable.fromFuture(future).subscribe(to);
to.assertNoErrors();
to.assertNotComplete();
}
代码示例来源:origin: redisson/redisson
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
Observable<T> o = fromFuture(future);
return o.subscribeOn(scheduler);
代码示例来源:origin: redisson/redisson
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
Observable<T> o = fromFuture(future, timeout, unit);
return o.subscribeOn(scheduler);
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fromFutureReturnsNull() {
FutureTask<Object> f = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
f.run();
TestObserver<Object> to = new TestObserver<Object>();
Observable.fromFuture(f).subscribe(to);
to.assertNoValues();
to.assertNotComplete();
to.assertError(NullPointerException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void fromFutureTimeout() throws Exception {
Observable.fromFuture(Observable.never()
.toFuture(), 100, TimeUnit.MILLISECONDS, Schedulers.io())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TimeoutException.class);
}
代码示例来源:origin: ReactiveX/RxJava
Observable<Object> futureObservable = Observable.fromFuture(future);
代码示例来源:origin: cn.leancloud/storage-core
FutureTask<List<AVObject>> futureTask = new FutureTask<List<AVObject>>(callable);
executor.submit(futureTask);
return Observable.fromFuture(futureTask);
代码示例来源:origin: cn.leancloud/storage-core
FutureTask<String> futureTask = new FutureTask<>(callable);
executor.submit(futureTask);
Observable result = Observable.fromFuture(futureTask);
if (isAsync) {
result = result.subscribeOn(Schedulers.io());
内容来源于网络,如有侵权,请联系作者删除!