io.reactivex.Observable.ambArray()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.5k)|赞(0)|评价(0)|浏览(129)

本文整理了Java中io.reactivex.Observable.ambArray()方法的一些代码示例,展示了Observable.ambArray()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.ambArray()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:ambArray

Observable.ambArray介绍

[英]Mirrors the one ObservableSource in an array of several ObservableSources that first either emits an item or sends a termination notification.

Scheduler: ambArray does not operate by default on a particular Scheduler.
[中]镜像多个可观察资源数组中的一个可观察资源,这些可观察资源首先发出一个项目或发送终止通知。
调度程序:默认情况下,ambArray不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void ambVarargsNull() {
  Observable.ambArray((Observable<Object>[])null);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Mirrors the ObservableSource (current or provided) that first either emits an item or sends a termination
 * notification.
 * <p>
 * <img width="640" height="385" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ambWith.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code ambWith} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param other
 *            an ObservableSource competing to react first. A subscription to this provided source will occur after
 *            subscribing to the current source.
 * @return an Observable that emits the same sequence as whichever of the source ObservableSources first
 *         emitted an item or sent a termination notification
 * @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
 */
@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> ambWith(ObservableSource<? extends T> other) {
  ObjectHelper.requireNonNull(other, "other is null");
  return ambArray(this, other);
}

代码示例来源:origin: redisson/redisson

/**
 * Mirrors the ObservableSource (current or provided) that first either emits an item or sends a termination
 * notification.
 * <p>
 * <img width="640" height="385" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ambWith.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code ambWith} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param other
 *            an ObservableSource competing to react first. A subscription to this provided source will occur after
 *            subscribing to the current source.
 * @return an Observable that emits the same sequence as whichever of the source ObservableSources first
 *         emitted an item or sent a termination notification
 * @see <a href="http://reactivex.io/documentation/operators/amb.html">ReactiveX operators documentation: Amb</a>
 */
@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> ambWith(ObservableSource<? extends T> other) {
  ObjectHelper.requireNonNull(other, "other is null");
  return ambArray(this, other);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noWinnerSuccessDispose() throws Exception {
  for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
    final AtomicBoolean interrupted = new AtomicBoolean();
    final CountDownLatch cdl = new CountDownLatch(1);
    Observable.ambArray(
      Observable.just(1)
        .subscribeOn(Schedulers.single())
        .observeOn(Schedulers.computation()),
      Observable.never()
    )
    .subscribe(new Consumer<Object>() {
      @Override
      public void accept(Object v) throws Exception {
        interrupted.set(Thread.currentThread().isInterrupted());
        cdl.countDown();
      }
    });
    assertTrue(cdl.await(500, TimeUnit.SECONDS));
    assertFalse("Interrupted!", interrupted.get());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void ambArrayEmpty() {
  assertSame(Observable.empty(), Observable.ambArray());
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void ambArrayOrder() {
  Observable<Integer> error = Observable.error(new RuntimeException());
  Observable.ambArray(Observable.just(1), error).test().assertValue(1).assertComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void ambVarargsOneIsNull() {
  Observable.ambArray(Observable.never(), null).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testAmb2() {
  IOException expectedException = new IOException(
      "fake exception");
  Observable<String> observable1 = createObservable(new String[] {},
      2000, new IOException("fake exception"));
  Observable<String> observable2 = createObservable(new String[] {
      "2", "22", "222", "2222" }, 1000, expectedException);
  Observable<String> observable3 = createObservable(new String[] {},
      3000, new IOException("fake exception"));
  @SuppressWarnings("unchecked")
  Observable<String> o = Observable.ambArray(observable1,
      observable2, observable3);
  Observer<String> observer = TestHelper.mockObserver();
  o.subscribe(observer);
  scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer, times(1)).onNext("2");
  inOrder.verify(observer, times(1)).onNext("22");
  inOrder.verify(observer, times(1)).onNext("222");
  inOrder.verify(observer, times(1)).onNext("2222");
  inOrder.verify(observer, times(1)).onError(expectedException);
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testAmb() {
  Observable<String> observable1 = createObservable(new String[] {
      "1", "11", "111", "1111" }, 2000, null);
  Observable<String> observable2 = createObservable(new String[] {
      "2", "22", "222", "2222" }, 1000, null);
  Observable<String> observable3 = createObservable(new String[] {
      "3", "33", "333", "3333" }, 3000, null);
  @SuppressWarnings("unchecked")
  Observable<String> o = Observable.ambArray(observable1,
      observable2, observable3);
  Observer<String> observer = TestHelper.mockObserver();
  o.subscribe(observer);
  scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer, times(1)).onNext("2");
  inOrder.verify(observer, times(1)).onNext("22");
  inOrder.verify(observer, times(1)).onNext("222");
  inOrder.verify(observer, times(1)).onNext("2222");
  inOrder.verify(observer, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void ambArraySingleElement() {
  assertSame(Observable.never(), Observable.ambArray(Observable.never()));
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noWinnerCompleteDispose() throws Exception {
  for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
    final AtomicBoolean interrupted = new AtomicBoolean();
    final CountDownLatch cdl = new CountDownLatch(1);
    Observable.ambArray(
      Observable.empty()
        .subscribeOn(Schedulers.single())
        .observeOn(Schedulers.computation()),
      Observable.never()
    )
    .subscribe(Functions.emptyConsumer(), Functions.emptyConsumer(), new Action() {
      @Override
      public void run() throws Exception {
        interrupted.set(Thread.currentThread().isInterrupted());
        cdl.countDown();
      }
    });
    assertTrue(cdl.await(500, TimeUnit.SECONDS));
    assertFalse("Interrupted!", interrupted.get());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void disposed() {
  TestHelper.checkDisposed(Observable.ambArray(Observable.never(), Observable.never()));
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void noWinnerErrorDispose() throws Exception {
  final TestException ex = new TestException();
  for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
    final AtomicBoolean interrupted = new AtomicBoolean();
    final CountDownLatch cdl = new CountDownLatch(1);
    Observable.ambArray(
      Observable.error(ex)
        .subscribeOn(Schedulers.single())
        .observeOn(Schedulers.computation()),
      Observable.never()
    )
    .subscribe(Functions.emptyConsumer(), new Consumer<Throwable>() {
      @Override
      public void accept(Throwable e) throws Exception {
        interrupted.set(Thread.currentThread().isInterrupted());
        cdl.countDown();
      }
    });
    assertTrue(cdl.await(500, TimeUnit.SECONDS));
    assertFalse("Interrupted!", interrupted.get());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testAmb3() {
  Observable<String> observable1 = createObservable(new String[] {
      "1" }, 2000, null);
  Observable<String> observable2 = createObservable(new String[] {},
      1000, null);
  Observable<String> observable3 = createObservable(new String[] {
      "3" }, 3000, null);
  @SuppressWarnings("unchecked")
  Observable<String> o = Observable.ambArray(observable1,
      observable2, observable3);
  Observer<String> observer = TestHelper.mockObserver();
  o.subscribe(observer);
  scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void testSubscriptionOnlyHappensOnce() throws InterruptedException {
  final AtomicLong count = new AtomicLong();
  Consumer<Disposable> incrementer = new Consumer<Disposable>() {
    @Override
    public void accept(Disposable d) {
      count.incrementAndGet();
    }
  };
  //this aync stream should emit first
  Observable<Integer> o1 = Observable.just(1).doOnSubscribe(incrementer)
      .delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
  //this stream emits second
  Observable<Integer> o2 = Observable.just(1).doOnSubscribe(incrementer)
      .delay(100, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.computation());
  TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.ambArray(o1, o2).subscribe(to);
  to.awaitTerminalEvent(5, TimeUnit.SECONDS);
  to.assertNoErrors();
  assertEquals(2, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteRace() {
  for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
    final PublishSubject<Integer> ps1 = PublishSubject.create();
    final PublishSubject<Integer> ps2 = PublishSubject.create();
    @SuppressWarnings("unchecked")
    TestObserver<Integer> to = Observable.ambArray(ps1, ps2).test();
    Runnable r1 = new Runnable() {
      @Override
      public void run() {
        ps1.onComplete();
      }
    };
    Runnable r2 = new Runnable() {
      @Override
      public void run() {
        ps2.onComplete();
      }
    };
    TestHelper.race(r1, r2);
    to.assertResult();
  }
}

代码示例来源:origin: ReactiveX/RxJava

TestObserver<Integer> to = Observable.ambArray(ps1, ps2).test();

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onNextRace() {
  for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
    final PublishSubject<Integer> ps1 = PublishSubject.create();
    final PublishSubject<Integer> ps2 = PublishSubject.create();
    @SuppressWarnings("unchecked")
    TestObserver<Integer> to = Observable.ambArray(ps1, ps2).test();
    Runnable r1 = new Runnable() {
      @Override
      public void run() {
        ps1.onNext(1);
      }
    };
    Runnable r2 = new Runnable() {
      @Override
      public void run() {
        ps2.onNext(1);
      }
    };
    TestHelper.race(r1, r2);
    to.assertSubscribed().assertNoErrors()
    .assertNotComplete().assertValueCount(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void testAmbCancelsOthers() {
  PublishSubject<Integer> source1 = PublishSubject.create();
  PublishSubject<Integer> source2 = PublishSubject.create();
  PublishSubject<Integer> source3 = PublishSubject.create();
  TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.ambArray(source1, source2, source3).subscribe(to);
  assertTrue("Source 1 doesn't have subscribers!", source1.hasObservers());
  assertTrue("Source 2 doesn't have subscribers!", source2.hasObservers());
  assertTrue("Source 3 doesn't have subscribers!", source3.hasObservers());
  source1.onNext(1);
  assertTrue("Source 1 doesn't have subscribers!", source1.hasObservers());
  assertFalse("Source 2 still has subscribers!", source2.hasObservers());
  assertFalse("Source 2 still has subscribers!", source3.hasObservers());
}

代码示例来源:origin: akarnokd/akarnokd-misc

@Test
  @SuppressWarnings("unchecked")
  public void test() throws Exception {
    Observable<Object> obs1 = Observable
        .<Object>just("obs 1 event")
        .doOnSubscribe(s -> System.out.println("obs1 sub"))
        .doOnDispose(() -> System.out.println("obs1 unsub"));

    Observable<Object> obs2 = Observable
        .<Object>just("obs 2 event")
        .doOnSubscribe(s -> System.out.println("obs2 sub"))
        .doOnDispose(() -> System.out.println("obs2 unsub"));

    Observable
    .ambArray(obs1, obs2)
    .subscribe(System.out::println);

    Thread.sleep(500);
  }
}

相关文章

Observable类方法