io.reactivex.Observable.fromFuture()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(6.7k)|赞(0)|评价(0)|浏览(182)

本文整理了Java中io.reactivex.Observable.fromFuture()方法的一些代码示例,展示了Observable.fromFuture()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.fromFuture()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:fromFuture

Observable.fromFuture介绍

[英]Converts a Future into an ObservableSource.

You can convert any object that supports the Future interface into an ObservableSource that emits the return value of the Future#get method of that object, by passing the object into the frommethod.

Important note: This ObservableSource is blocking; you cannot dispose it.

Unlike 1.x, disposing the Observable won't cancel the future. If necessary, one can use composition to achieve the cancellation effect: futureObservableSource.doOnDispose(() -> future.cancel(true));. Scheduler: fromFuture does not operate by default on a particular Scheduler.
[中]将未来转化为可观察的资源。
您可以将任何支持Future接口的对象转换为ObservableSource,通过将该对象传递到frommethod,该对象将发出该对象的Future#get方法的返回值。
*重要提示:*此可观察资源处于阻塞状态;你不能处理它。
不像1。x、 处理可观察的事物不会抵消未来。如有必要,人们可以使用合成来实现消除效果:futureObservableSource。doOnDispose(()->未来。取消(对);。调度器:默认情况下,fromFuture不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromFutureSchedulerNull() {
  FutureTask<Object> f = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
  Observable.fromFuture(f, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromFutureTimedUnitNull() {
  Observable.fromFuture(new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null), 1, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromFutureTimedFutureNull() {
  Observable.fromFuture(null, 1, TimeUnit.SECONDS);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromFutureNull() {
  Observable.fromFuture(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromFutureTimedSchedulerNull() {
  Observable.fromFuture(new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null), 1, TimeUnit.SECONDS, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromFutureTimedReturnsNull() {
  FutureTask<Object> f = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
  f.run();
  Observable.fromFuture(f, 1, TimeUnit.SECONDS).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSuccess() throws Exception {
  @SuppressWarnings("unchecked")
  Future<Object> future = mock(Future.class);
  Object value = new Object();
  when(future.get()).thenReturn(value);
  Observer<Object> o = TestHelper.mockObserver();
  TestObserver<Object> to = new TestObserver<Object>(o);
  Observable.fromFuture(future).subscribe(to);
  to.dispose();
  verify(o, times(1)).onNext(value);
  verify(o, times(1)).onComplete();
  verify(o, never()).onError(any(Throwable.class));
  verify(future, never()).cancel(true);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testFailure() throws Exception {
  @SuppressWarnings("unchecked")
  Future<Object> future = mock(Future.class);
  RuntimeException e = new RuntimeException();
  when(future.get()).thenThrow(e);
  Observer<Object> o = TestHelper.mockObserver();
  TestObserver<Object> to = new TestObserver<Object>(o);
  Observable.fromFuture(future).subscribe(to);
  to.dispose();
  verify(o, never()).onNext(null);
  verify(o, never()).onComplete();
  verify(o, times(1)).onError(e);
  verify(future, never()).cancel(true);
}

代码示例来源:origin: ReactiveX/RxJava

public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler) {
  ObjectHelper.requireNonNull(scheduler, "scheduler is null");
  Observable<T> o = fromFuture(future);
  return o.subscribeOn(scheduler);

代码示例来源:origin: ReactiveX/RxJava

public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler) {
  ObjectHelper.requireNonNull(scheduler, "scheduler is null");
  Observable<T> o = fromFuture(future, timeout, unit);
  return o.subscribeOn(scheduler);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSuccessOperatesOnSuppliedScheduler() throws Exception {
  @SuppressWarnings("unchecked")
  Future<Object> future = mock(Future.class);
  Object value = new Object();
  when(future.get()).thenReturn(value);
  Observer<Object> o = TestHelper.mockObserver();
  TestScheduler scheduler = new TestScheduler();
  TestObserver<Object> to = new TestObserver<Object>(o);
  Observable.fromFuture(future, scheduler).subscribe(to);
  verify(o, never()).onNext(value);
  scheduler.triggerActions();
  verify(o, times(1)).onNext(value);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testCancelledBeforeSubscribe() throws Exception {
  @SuppressWarnings("unchecked")
  Future<Object> future = mock(Future.class);
  CancellationException e = new CancellationException("unit test synthetic cancellation");
  when(future.get()).thenThrow(e);
  Observer<Object> o = TestHelper.mockObserver();
  TestObserver<Object> to = new TestObserver<Object>(o);
  to.dispose();
  Observable.fromFuture(future).subscribe(to);
  to.assertNoErrors();
  to.assertNotComplete();
}

代码示例来源:origin: redisson/redisson

public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler) {
  ObjectHelper.requireNonNull(scheduler, "scheduler is null");
  Observable<T> o = fromFuture(future);
  return o.subscribeOn(scheduler);

代码示例来源:origin: redisson/redisson

public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler) {
  ObjectHelper.requireNonNull(scheduler, "scheduler is null");
  Observable<T> o = fromFuture(future, timeout, unit);
  return o.subscribeOn(scheduler);

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fromFutureReturnsNull() {
  FutureTask<Object> f = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
  f.run();
  TestObserver<Object> to = new TestObserver<Object>();
  Observable.fromFuture(f).subscribe(to);
  to.assertNoValues();
  to.assertNotComplete();
  to.assertError(NullPointerException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void fromFutureTimeout() throws Exception {
  Observable.fromFuture(Observable.never()
  .toFuture(), 100, TimeUnit.MILLISECONDS, Schedulers.io())
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertFailure(TimeoutException.class);
}

代码示例来源:origin: ReactiveX/RxJava

Observable<Object> futureObservable = Observable.fromFuture(future);

代码示例来源:origin: cn.leancloud/storage-core

FutureTask<List<AVObject>> futureTask = new FutureTask<List<AVObject>>(callable);
executor.submit(futureTask);
return Observable.fromFuture(futureTask);

代码示例来源:origin: cn.leancloud/storage-core

FutureTask<String> futureTask = new FutureTask<>(callable);
executor.submit(futureTask);
Observable result = Observable.fromFuture(futureTask);
if (isAsync) {
 result = result.subscribeOn(Schedulers.io());

相关文章

Observable类方法