io.reactivex.Observable.take()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(6.1k)|赞(0)|评价(0)|浏览(225)

本文整理了Java中io.reactivex.Observable.take()方法的一些代码示例,展示了Observable.take()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.take()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:take

Observable.take介绍

[英]Returns an Observable that emits only the first count items emitted by the source ObservableSource. If the source emits fewer than count items then all of its items are emitted.

This method returns an ObservableSource that will invoke a subscribing Observer's Observer#onNext function a maximum of count times before invoking Observer#onComplete. Scheduler: This version of take does not operate by default on a particular Scheduler.
[中]返回仅发出源ObservableSource发出的第一个计数项的Observable。如果源发射的项目少于计数,则发射其所有项目。
此方法返回一个ObservateSource,在调用Observator#onComplete之前,它将调用订阅观测者的Observator#onNext函数的最大计数次数。调度程序:默认情况下,此版本的take不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
    return o.take(2);
  }
});

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void disconnectBeforeConnect() {
    BehaviorSubject<Integer> subject = BehaviorSubject.create();

    Observable<Integer> observable = subject
        .replay(1)
        .refCount();

    observable.takeUntil(Observable.just(1)).test();

    subject.onNext(2);

    observable.take(1).test().assertResult(2);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testUnsubscribeFromSynchronousInfiniteObservable() {
  final AtomicLong count = new AtomicLong();
  INFINITE_OBSERVABLE.take(10).subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long l) {
      count.set(l);
    }
  });
  assertEquals(10, count.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<String> apply(final String s) {
    return Observable.just(s)
        .mergeWith(Observable.interval(10, TimeUnit.MILLISECONDS)
        .map(new Function<Long, String>() {
          @Override
          public String apply(Long i) {
            return s + " " + i;
          }
        })).take(250);
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTake2() {
  Observable<Integer> o = Observable.just(1, 2, 3, 4, 5);
  Iterable<String> it = Arrays.asList("a", "b", "c", "d", "e");
  SquareStr squareStr = new SquareStr();
  o.map(squareStr).zipWith(it, concat2Strings).take(2).subscribe(printer);
  assertEquals(2, squareStr.counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTake2() {
  Observable<String> w = Observable.fromIterable(Arrays.asList("one", "two", "three"));
  Observable<String> take = w.take(1);
  Observer<String> observer = TestHelper.mockObserver();
  take.subscribe(observer);
  verify(observer, times(1)).onNext("one");
  verify(observer, never()).onNext("two");
  verify(observer, never()).onNext("three");
  verify(observer, never()).onError(any(Throwable.class));
  verify(observer, times(1)).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = IllegalArgumentException.class)
public void testTakeWithError() {
  Observable.fromIterable(Arrays.asList(1, 2, 3)).take(1)
  .map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer t1) {
      throw new IllegalArgumentException("some error");
    }
  }).blockingSingle();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatAndTake() {
  Observer<Object> o = TestHelper.mockObserver();
  Observable.just(1).repeat().take(10).subscribe(o);
  verify(o, times(10)).onNext(1);
  verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void bufferWithSizeSkipTake1() {
  Observable<Integer> source = Observable.just(1).repeat();
  Observable<List<Integer>> result = source.buffer(2, 3).take(1);
  Observer<Object> o = TestHelper.mockObserver();
  result.subscribe(o);
  verify(o).onNext(Arrays.asList(1, 1));
  verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatTake() {
  Observable<Integer> xs = Observable.just(1, 2);
  Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
  assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() {
  Observable<Integer> cache = Observable.range(1, 5).cache();
  cache.take(2).test().assertResult(1, 2);
  cache.take(3).test().assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() throws Exception {
  Action onCancel = mock(Action.class);
  Observable.range(1, 5)
  .doOnDispose(onCancel)
  .throttleLatest(1, TimeUnit.MINUTES)
  .take(1)
  .test()
  .assertResult(1);
  verify(onCancel).run();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void takeMain() {
  final TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.range(1, 5)
  .concatWith(Single.just(100))
  .take(3)
  .subscribe(to);
  to.assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void timedDefaultScheduler() {
    Observable.range(1, 5).take(1, TimeUnit.MINUTES)
    .test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertResult(1, 2, 3, 4, 5);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
  public void cancel() {
    Observable.intervalRange(0, 20, 1, 1, TimeUnit.MILLISECONDS, Schedulers.trampoline())
    .take(10)
    .test()
    .assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() {
  Maybe.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
    @Override
    public Iterable<Integer> apply(Integer v) throws Exception {
      return Arrays.asList(v, v + 1);
    }
  })
  .take(1)
  .test()
  .assertResult(1);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() {
  Observable.range(1, 5)
  .mergeWith(Single.just(100))
  .take(3)
  .test()
  .assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void take() {
  Observable.range(1, 5)
  .mergeWith(Maybe.just(100))
  .take(3)
  .test()
  .assertResult(1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void normalTakeConditional() {
  Observable.range(1, 10)
  .doFinally(this)
  .filter(Functions.alwaysTrue())
  .take(5)
  .test()
  .assertResult(1, 2, 3, 4, 5);
  assertEquals(1, calls);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void selectorTake() {
  PublishSubject<Integer> ps = PublishSubject.create();
  TestObserver<Integer> to = ps
  .timeout(Functions.justFunction(Observable.never()))
  .take(1)
  .test();
  assertTrue(ps.hasObservers());
  ps.onNext(1);
  assertFalse(ps.hasObservers());
  to.assertResult(1);
}

相关文章

Observable类方法