io.reactivex.Observable.interval()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.2k)|赞(0)|评价(0)|浏览(209)

本文整理了Java中io.reactivex.Observable.interval()方法的一些代码示例,展示了Observable.interval()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.interval()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:interval

Observable.interval介绍

[英]Returns an Observable that emits a 0L after the initialDelay and ever increasing numbers after each period of time thereafter.

Scheduler: interval operates by default on the computation Scheduler.
[中]返回一个可观测值,该值在初始延迟后发出0L,此后的每一段时间后都会不断增加。
调度程序:默认情况下,interval在计算调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<Long> apply(Long t1) {
    return Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
  }
};

代码示例来源:origin: ReactiveX/RxJava

public void intervalSchedulerNull() {
  Observable.interval(1, TimeUnit.SECONDS, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void intervalPeriodUnitNull() {
  Observable.interval(1, 1, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void intervalUnitNull() {
  Observable.interval(1, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void intervalPeriodSchedulerNull() {
  Observable.interval(1, 1, TimeUnit.SECONDS, null);
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Observable<String> apply(final String s) {
    return Observable.just(s)
        .mergeWith(Observable.interval(10, TimeUnit.MILLISECONDS)
        .map(new Function<Long, String>() {
          @Override
          public String apply(Long i) {
            return s + " " + i;
          }
        })).take(250);
  }
})

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void bufferWithBoundaryTake2() {
  Observable<Long> boundary = Observable.interval(60, 60, TimeUnit.MILLISECONDS, scheduler);
  Observable<Long> source = Observable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler);
  Observable<List<Long>> result = source.buffer(boundary).take(2);
  Observer<Object> o = TestHelper.mockObserver();
  InOrder inOrder = inOrder(o);
  result.subscribe(o);
  scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
  inOrder.verify(o).onNext(Arrays.asList(0L));
  inOrder.verify(o).onNext(Arrays.asList(1L));
  inOrder.verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void bufferWithTimeSkipTake2() {
  Observable<Long> source = Observable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler);
  Observable<List<Long>> result = source.buffer(100, 60, TimeUnit.MILLISECONDS, scheduler).take(2);
  Observer<Object> o = TestHelper.mockObserver();
  InOrder inOrder = inOrder(o);
  result.subscribe(o);
  scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
  inOrder.verify(o).onNext(Arrays.asList(0L, 1L));
  inOrder.verify(o).onNext(Arrays.asList(1L, 2L));
  inOrder.verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void bufferWithTimeAndSize() {
  Observable<Long> source = Observable.interval(30, 30, TimeUnit.MILLISECONDS, scheduler);
  Observable<List<Long>> result = source.buffer(100, TimeUnit.MILLISECONDS, scheduler, 2).take(3);
  Observer<Object> o = TestHelper.mockObserver();
  InOrder inOrder = inOrder(o);
  result.subscribe(o);
  scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
  inOrder.verify(o).onNext(Arrays.asList(0L, 1L));
  inOrder.verify(o).onNext(Arrays.asList(2L));
  inOrder.verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void bufferWithTimeTake1() {
  Observable<Long> source = Observable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler);
  Observable<List<Long>> result = source.buffer(100, TimeUnit.MILLISECONDS, scheduler).take(1);
  Observer<Object> o = TestHelper.mockObserver();
  result.subscribe(o);
  scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
  verify(o).onNext(Arrays.asList(0L, 1L));
  verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testGroupByOnAsynchronousSourceAcceptsMultipleSubscriptions() throws InterruptedException {
  // choose an asynchronous source
  Observable<Long> source = Observable.interval(10, TimeUnit.MILLISECONDS).take(1);
  // apply groupBy to the source
  Observable<GroupedObservable<Boolean, Long>> stream = source.groupBy(IS_EVEN);
  // create two observers
  Observer<GroupedObservable<Boolean, Long>> o1 = TestHelper.mockObserver();
  Observer<GroupedObservable<Boolean, Long>> o2 = TestHelper.mockObserver();
  // subscribe with the observers
  stream.subscribe(o1);
  stream.subscribe(o2);
  // check that subscriptions were successful
  verify(o1, never()).onError(Mockito.<Throwable> any());
  verify(o2, never()).onError(Mockito.<Throwable> any());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000, expected = NoSuchElementException.class)
public void testSimpleJustNext() {
  TestScheduler scheduler = new TestScheduler();
  Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
  Iterable<Long> iter = source.blockingLatest();
  Iterator<Long> it = iter.iterator();
  // only 9 because take(10) will immediately call onComplete when receiving the 10th item
  // which onComplete will overwrite the previous value
  for (int i = 0; i < 10; i++) {
    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    Assert.assertEquals(Long.valueOf(i), it.next());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void dispose() {
  TestHelper.checkDisposed(Observable.interval(1, TimeUnit.MILLISECONDS, new TestScheduler()));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000)
public void testSingleSourceManyIterators() {
  TestScheduler scheduler = new TestScheduler();
  Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
  Iterable<Long> iter = source.blockingMostRecent(-1L);
  for (int j = 0; j < 3; j++) {
    Iterator<Long> it = iter.iterator();
    Assert.assertEquals(Long.valueOf(-1), it.next());
    for (int i = 0; i < 9; i++) {
      scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
      Assert.assertEquals(true, it.hasNext());
      Assert.assertEquals(Long.valueOf(i), it.next());
    }
    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    Assert.assertEquals(false, it.hasNext());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000)
public void testSimple() {
  TestScheduler scheduler = new TestScheduler();
  Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
  Iterable<Long> iter = source.blockingLatest();
  Iterator<Long> it = iter.iterator();
  // only 9 because take(10) will immediately call onComplete when receiving the 10th item
  // which onComplete will overwrite the previous value
  for (int i = 0; i < 9; i++) {
    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    Assert.assertEquals(true, it.hasNext());
    Assert.assertEquals(Long.valueOf(i), it.next());
  }
  scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
  Assert.assertEquals(false, it.hasNext());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 1000)
public void testSameSourceMultipleIterators() {
  TestScheduler scheduler = new TestScheduler();
  Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
  Iterable<Long> iter = source.blockingLatest();
  for (int j = 0; j < 3; j++) {
    Iterator<Long> it = iter.iterator();
    // only 9 because take(10) will immediately call onComplete when receiving the 10th item
    // which onComplete will overwrite the previous value
    for (int i = 0; i < 9; i++) {
      scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
      Assert.assertEquals(true, it.hasNext());
      Assert.assertEquals(Long.valueOf(i), it.next());
    }
    scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    Assert.assertEquals(false, it.hasNext());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void cancel() {
  Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.trampoline())
  .take(10)
  .test()
  .assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
}

代码示例来源:origin: ReactiveX/RxJava

@Test /* (timeout = 8000) */
public void testSingleSourceManyIterators() throws InterruptedException {
  Observable<Long> o = Observable.interval(250, TimeUnit.MILLISECONDS);
  PublishSubject<Integer> terminal = PublishSubject.create();
  Observable<Long> source = o.takeUntil(terminal);
  Iterable<Long> iter = source.blockingNext();
  for (int j = 0; j < 3; j++) {
    BlockingObservableNext.NextIterator<Long> it = (BlockingObservableNext.NextIterator<Long>)iter.iterator();
    for (long i = 0; i < 10; i++) {
      Assert.assertEquals(true, it.hasNext());
      Assert.assertEquals(j + "th iteration next", Long.valueOf(i), it.next());
    }
    terminal.onNext(1);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testTimerPeriodically() {
  TestObserver<Long> to = new TestObserver<Long>();
  Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler).subscribe(to);
  scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
  to.assertValue(0L);
  scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
  to.assertValues(0L, 1L);
  scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
  to.assertValues(0L, 1L, 2L);
  scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
  to.assertValues(0L, 1L, 2L, 3L);
  to.dispose();
  scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
  to.assertValues(0L, 1L, 2L, 3L);
  to.assertNotComplete();
  to.assertNoErrors();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testConnectWithNoSubscriber() {
  TestScheduler scheduler = new TestScheduler();
  ConnectableObservable<Long> co = Observable.interval(10, 10, TimeUnit.MILLISECONDS, scheduler).take(3).publish();
  co.connect();
  // Emit 0
  scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
  TestObserver<Long> to = new TestObserver<Long>();
  co.subscribe(to);
  // Emit 1 and 2
  scheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS);
  to.assertValues(1L, 2L);
  to.assertNoErrors();
  to.assertTerminated();
}

相关文章

Observable类方法