io.reactivex.Observable类的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(10.3k)|赞(0)|评价(0)|浏览(231)

本文整理了Java中io.reactivex.Observable类的一些代码示例,展示了Observable类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable类的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable

Observable介绍

[英]The Observable class is the non-backpressured, optionally multi-valued base reactive class that offers factory methods, intermediate operators and the ability to consume synchronous and/or asynchronous reactive dataflows.

Many operators in the class accept ObservableSource(s), the base reactive interface for such non-backpressured flows, which Observable itself implements as well.

The Observable's operators, by default, run with a buffer size of 128 elements (see Flowable#bufferSize(), that can be overridden globally via the system parameter rx2.buffer-size. Most operators, however, have overloads that allow setting their internal buffer size explicitly.

The documentation for this class makes use of marble diagrams. The following legend explains these diagrams:

The design of this class was derived from the Reactive-Streams design and specification by removing any backpressure-related infrastructure and implementation detail, replacing the org.reactivestreams.Subscription with Disposable as the primary means to cancel a flow.

The Observable follows the protocol

onSubscribe onNext* (onError | onComplete)?

where the stream can be disposed through the Disposable instance provided to consumers through Observer.onSubscribe.

Unlike the Observable of version 1.x, #subscribe(Observer) does not allow external cancellation of a subscription and the Observer instance is expected to expose such capability.

Example:

Disposable d = Observable.just("Hello world!") 
.delay(1, TimeUnit.SECONDS) 
.subscribeWith(new DisposableObserver<String>() { 
@Override public void onStart() { 
System.out.println("Start!"); 
} 
@Override public void onNext(Integer t) { 
System.out.println(t); 
} 
@Override public void onError(Throwable t) { 
t.printStackTrace(); 
} 
@Override public void onComplete() { 
System.out.println("Done!"); 
} 
}); 
Thread.sleep(500); 
// the sequence now can be cancelled via dispose() 
d.dispose();

[中]Observable类是非背压的、可选的多值基本反应类,它提供工厂方法、中间运算符以及使用同步和/或异步反应数据流的能力。
该类中的许多运算符都接受Observable Source,这是此类非背压流的基本反应接口,Observable本身也实现了这一接口。
默认情况下,Observable的运算符以128个元素的缓冲区大小运行(请参见Flowable#bufferSize(),可通过系统参数rx2全局覆盖)。缓冲区大小。然而,大多数操作符都有重载,允许显式设置其内部缓冲区大小。
这个类的文档使用大理石图表。以下图例解释了这些图表:
这个类的设计源于Reactive-Streams design and specification,它删除了任何与背压相关的基础设施和实现细节,替换了org。反应流。使用一次性订阅作为取消流的主要手段。
可观测的遵循协议

onSubscribe onNext* (onError | onComplete)?

其中流可以通过Observer提供给消费者的一次性实例进行处理。订阅。
与版本1的可观察性不同。x、 #subscribe(Observer)不允许从外部取消订阅,而Observer实例将公开此类功能。
例子:

Disposable d = Observable.just("Hello world!") 
.delay(1, TimeUnit.SECONDS) 
.subscribeWith(new DisposableObserver<String>() { 
@Override public void onStart() { 
System.out.println("Start!"); 
} 
@Override public void onNext(Integer t) { 
System.out.println(t); 
} 
@Override public void onError(Throwable t) { 
t.printStackTrace(); 
} 
@Override public void onComplete() { 
System.out.println("Done!"); 
} 
}); 
Thread.sleep(500); 
// the sequence now can be cancelled via dispose() 
d.dispose();

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Integer> apply(Integer t) {
    Observable<Integer> o = Observable.just(t)
        .subscribeOn(sch)
    ;
    Subject<Integer> subject = UnicastSubject.create();
    o.subscribe(subject);
    return subject;
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = IllegalArgumentException.class)
public void testMapWithIssue417() {
  Observable.just(1).observeOn(Schedulers.computation())
      .map(new Function<Integer, Integer>() {
        @Override
        public Integer apply(Integer arg0) {
          throw new IllegalArgumentException("any error");
        }
      }).blockingSingle();
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Observable<Integer> o) throws Exception {
    return Observable.just(1).window(o).flatMap(new Function<Observable<Integer>, ObservableSource<Integer>>() {
      @Override
      public ObservableSource<Integer> apply(Observable<Integer> v) throws Exception {
        return v;
      }
    });
  }
}, false, 1, 1, (Object[])null);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<String> apply(final String s) {
    return Observable.just(s)
        .mergeWith(Observable.interval(10, TimeUnit.MILLISECONDS)
        .map(new Function<Long, String>() {
          @Override
          public String apply(Long i) {
            return s + " " + i;
          }
        })).take(250);
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Object apply(Observable<Object> o) throws Exception {
    return o.window(Observable.never()).flatMap(new Function<Observable<Object>, ObservableSource<Object>>() {
      @Override
      public ObservableSource<Object> apply(Observable<Object> v) throws Exception {
        return v;
      }
    });
  }
}, false, 1, 1, 1);

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Object> apply(Integer v) throws Exception {
    return Observable.range(1, 2).map(new Function<Integer, Object>() {
      @Override
      public Object apply(Integer w) throws Exception {
        throw new TestException();
      }
    });
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test
public void innerEscapeCompleted() {
  Observable<Integer> source = Observable.just(0);
  Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
  TestObserver<Object> to = new TestObserver<Object>();
  m.subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
  System.out.println(to.values());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testBackpressure1() {
  TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.range(1, 100000).takeLast(1)
  .observeOn(Schedulers.newThread())
  .map(newSlowProcessor()).subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
  to.assertValue(100000);
}

代码示例来源:origin: ReactiveX/RxJava

@Ignore("Not sure what this does")
@Test
public void addOnNextValueExceptionAdded() throws Exception {
  Observer<BadToString> observer = new BadToStringObserver();
  Observable.just(new BadToString(false))
      .map(new Function<BadToString, BadToString>() {
        @Override
        public BadToString apply(BadToString badToString) {
          throw new IllegalArgumentException("Failure while handling");
        }
      }).subscribe(observer);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTake2() {
  Observable<Integer> o = Observable.just(1, 2, 3, 4, 5);
  Iterable<String> it = Arrays.asList("a", "b", "c", "d", "e");
  SquareStr squareStr = new SquareStr();
  o.map(squareStr).zipWith(it, concat2Strings).take(2).subscribe(printer);
  assertEquals(2, squareStr.counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTakeFirstWithPredicateOfNoneMatchingThePredicate() {
  Observable<Integer> o = Observable.just(1, 3, 5, 7, 9, 7, 5, 3, 1);
  o.filter(IS_EVEN).take(1).subscribe(w);
  verify(w, never()).onNext(anyInt());
  verify(w, times(1)).onComplete();
  verify(w, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void workerNotDisposedPrematurelyNormalInAsyncOut() {
  DisposeTrackingScheduler s = new DisposeTrackingScheduler();
  TestObserver<Integer> to = new TestObserverFusedCanceling();
  Observable.just(1).hide().observeOn(s).subscribe(to);
  assertEquals(1, s.disposedCount.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testFirstOrElseWithPredicateOfNoneMatchingThePredicateObservable() {
  Observable<String> src = Observable.just("a", "b", "c");
  src.filter(IS_D).first("default").toObservable().subscribe(w);
  verify(w, times(1)).onNext(anyString());
  verify(w, times(1)).onNext("default");
  verify(w, never()).onError(any(Throwable.class));
  verify(w, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void nonFusedConditional() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.SYNC);
  Observable.range(1, 5).hide()
  .doFinally(this)
  .filter(Functions.alwaysTrue())
  .subscribe(to);
  ObserverFusion.assertFusion(to, QueueFuseable.NONE)
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(1, calls);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testError2() {
  Observable<Integer> source = Observable.concat(Observable.just(0),
      Observable.<Integer> error(new TestException("Forced failure")));
  Observable<Integer> m = source.groupBy(identity, dbl).flatMap(FLATTEN_INTEGER);
  TestObserver<Object> to = new TestObserver<Object>();
  m.subscribe(to);
  to.awaitTerminalEvent();
  assertEquals(1, to.errorCount());
  to.assertValueCount(1);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * This is testing a no-op path since it uses Schedulers.immediate() which will not do scheduling.
 */
@Test
public void testObserveOn() {
  Observer<Integer> observer = TestHelper.mockObserver();
  Observable.just(1, 2, 3).observeOn(ImmediateThinScheduler.INSTANCE).subscribe(observer);
  verify(observer, times(1)).onNext(1);
  verify(observer, times(1)).onNext(2);
  verify(observer, times(1)).onNext(3);
  verify(observer, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testFlatMapTransformsOnErrorFuncThrows() {
  Observable<Integer> onNext = Observable.fromIterable(Arrays.asList(1, 2, 3));
  Observable<Integer> onComplete = Observable.fromIterable(Arrays.asList(4));
  Observable<Integer> onError = Observable.fromIterable(Arrays.asList(5));
  Observable<Integer> source = Observable.error(new TestException());
  Observer<Object> o = TestHelper.mockObserver();
  source.flatMap(just(onNext), funcThrow((Throwable) null, onError), just0(onComplete)).subscribe(o);
  verify(o).onError(any(CompositeException.class));
  verify(o, never()).onNext(any());
  verify(o, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void outputFused() {
  TestObserver<Integer> to = ObserverFusion.newTest(QueueFuseable.ANY);
  Observable.range(1, 5).hide()
  .observeOn(Schedulers.single())
  .subscribe(to);
  ObserverFusion.assertFusion(to, QueueFuseable.ASYNC)
  .awaitDone(5, TimeUnit.SECONDS)
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testFlatMapTransformsOnCompletedFuncThrows() {
  Observable<Integer> onNext = Observable.fromIterable(Arrays.asList(1, 2, 3));
  Observable<Integer> onComplete = Observable.fromIterable(Arrays.asList(4));
  Observable<Integer> onError = Observable.fromIterable(Arrays.asList(5));
  Observable<Integer> source = Observable.fromIterable(Arrays.<Integer> asList());
  Observer<Object> o = TestHelper.mockObserver();
  source.flatMap(just(onNext), just(onError), funcThrow0(onComplete)).subscribe(o);
  verify(o).onError(any(TestException.class));
  verify(o, never()).onNext(any());
  verify(o, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testLongTimeAction() throws InterruptedException {
  final CountDownLatch latch = new CountDownLatch(1);
  LongTimeAction action = new LongTimeAction(latch);
  Observable.just(1).buffer(10, TimeUnit.MILLISECONDS, 10)
      .subscribe(action);
  latch.await();
  assertFalse(action.fail);
}

相关文章

Observable类方法