本文整理了Java中io.reactivex.Observable.interval()
方法的一些代码示例,展示了Observable.interval()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.interval()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:interval
[英]Returns an Observable that emits a 0L after the initialDelay and ever increasing numbers after each period of time thereafter.
Scheduler: interval operates by default on the computation Scheduler.
[中]返回一个可观测值,该值在初始延迟后发出0L,此后的每一段时间后都会不断增加。
调度程序:默认情况下,interval在计算调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<Long> apply(Long t1) {
return Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
}
};
代码示例来源:origin: ReactiveX/RxJava
public void intervalSchedulerNull() {
Observable.interval(1, TimeUnit.SECONDS, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void intervalPeriodUnitNull() {
Observable.interval(1, 1, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void intervalUnitNull() {
Observable.interval(1, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void intervalPeriodSchedulerNull() {
Observable.interval(1, 1, TimeUnit.SECONDS, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Observable<String> apply(final String s) {
return Observable.just(s)
.mergeWith(Observable.interval(10, TimeUnit.MILLISECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long i) {
return s + " " + i;
}
})).take(250);
}
})
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void bufferWithBoundaryTake2() {
Observable<Long> boundary = Observable.interval(60, 60, TimeUnit.MILLISECONDS, scheduler);
Observable<Long> source = Observable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler);
Observable<List<Long>> result = source.buffer(boundary).take(2);
Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
result.subscribe(o);
scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
inOrder.verify(o).onNext(Arrays.asList(0L));
inOrder.verify(o).onNext(Arrays.asList(1L));
inOrder.verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void bufferWithTimeSkipTake2() {
Observable<Long> source = Observable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler);
Observable<List<Long>> result = source.buffer(100, 60, TimeUnit.MILLISECONDS, scheduler).take(2);
Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
result.subscribe(o);
scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
inOrder.verify(o).onNext(Arrays.asList(0L, 1L));
inOrder.verify(o).onNext(Arrays.asList(1L, 2L));
inOrder.verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void bufferWithTimeAndSize() {
Observable<Long> source = Observable.interval(30, 30, TimeUnit.MILLISECONDS, scheduler);
Observable<List<Long>> result = source.buffer(100, TimeUnit.MILLISECONDS, scheduler, 2).take(3);
Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
result.subscribe(o);
scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
inOrder.verify(o).onNext(Arrays.asList(0L, 1L));
inOrder.verify(o).onNext(Arrays.asList(2L));
inOrder.verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void bufferWithTimeTake1() {
Observable<Long> source = Observable.interval(40, 40, TimeUnit.MILLISECONDS, scheduler);
Observable<List<Long>> result = source.buffer(100, TimeUnit.MILLISECONDS, scheduler).take(1);
Observer<Object> o = TestHelper.mockObserver();
result.subscribe(o);
scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
verify(o).onNext(Arrays.asList(0L, 1L));
verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testGroupByOnAsynchronousSourceAcceptsMultipleSubscriptions() throws InterruptedException {
// choose an asynchronous source
Observable<Long> source = Observable.interval(10, TimeUnit.MILLISECONDS).take(1);
// apply groupBy to the source
Observable<GroupedObservable<Boolean, Long>> stream = source.groupBy(IS_EVEN);
// create two observers
Observer<GroupedObservable<Boolean, Long>> o1 = TestHelper.mockObserver();
Observer<GroupedObservable<Boolean, Long>> o2 = TestHelper.mockObserver();
// subscribe with the observers
stream.subscribe(o1);
stream.subscribe(o2);
// check that subscriptions were successful
verify(o1, never()).onError(Mockito.<Throwable> any());
verify(o2, never()).onError(Mockito.<Throwable> any());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000, expected = NoSuchElementException.class)
public void testSimpleJustNext() {
TestScheduler scheduler = new TestScheduler();
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
Iterable<Long> iter = source.blockingLatest();
Iterator<Long> it = iter.iterator();
// only 9 because take(10) will immediately call onComplete when receiving the 10th item
// which onComplete will overwrite the previous value
for (int i = 0; i < 10; i++) {
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(Long.valueOf(i), it.next());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.interval(1, TimeUnit.MILLISECONDS, new TestScheduler()));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000)
public void testSingleSourceManyIterators() {
TestScheduler scheduler = new TestScheduler();
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
Iterable<Long> iter = source.blockingMostRecent(-1L);
for (int j = 0; j < 3; j++) {
Iterator<Long> it = iter.iterator();
Assert.assertEquals(Long.valueOf(-1), it.next());
for (int i = 0; i < 9; i++) {
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(true, it.hasNext());
Assert.assertEquals(Long.valueOf(i), it.next());
}
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(false, it.hasNext());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000)
public void testSimple() {
TestScheduler scheduler = new TestScheduler();
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
Iterable<Long> iter = source.blockingLatest();
Iterator<Long> it = iter.iterator();
// only 9 because take(10) will immediately call onComplete when receiving the 10th item
// which onComplete will overwrite the previous value
for (int i = 0; i < 9; i++) {
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(true, it.hasNext());
Assert.assertEquals(Long.valueOf(i), it.next());
}
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(false, it.hasNext());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 1000)
public void testSameSourceMultipleIterators() {
TestScheduler scheduler = new TestScheduler();
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
Iterable<Long> iter = source.blockingLatest();
for (int j = 0; j < 3; j++) {
Iterator<Long> it = iter.iterator();
// only 9 because take(10) will immediately call onComplete when receiving the 10th item
// which onComplete will overwrite the previous value
for (int i = 0; i < 9; i++) {
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(true, it.hasNext());
Assert.assertEquals(Long.valueOf(i), it.next());
}
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Assert.assertEquals(false, it.hasNext());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void cancel() {
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.trampoline())
.take(10)
.test()
.assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
}
代码示例来源:origin: ReactiveX/RxJava
@Test /* (timeout = 8000) */
public void testSingleSourceManyIterators() throws InterruptedException {
Observable<Long> o = Observable.interval(250, TimeUnit.MILLISECONDS);
PublishSubject<Integer> terminal = PublishSubject.create();
Observable<Long> source = o.takeUntil(terminal);
Iterable<Long> iter = source.blockingNext();
for (int j = 0; j < 3; j++) {
BlockingObservableNext.NextIterator<Long> it = (BlockingObservableNext.NextIterator<Long>)iter.iterator();
for (long i = 0; i < 10; i++) {
Assert.assertEquals(true, it.hasNext());
Assert.assertEquals(j + "th iteration next", Long.valueOf(i), it.next());
}
terminal.onNext(1);
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testTimerPeriodically() {
TestObserver<Long> to = new TestObserver<Long>();
Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler).subscribe(to);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
to.assertValue(0L);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
to.assertValues(0L, 1L);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
to.assertValues(0L, 1L, 2L);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
to.assertValues(0L, 1L, 2L, 3L);
to.dispose();
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
to.assertValues(0L, 1L, 2L, 3L);
to.assertNotComplete();
to.assertNoErrors();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testConnectWithNoSubscriber() {
TestScheduler scheduler = new TestScheduler();
ConnectableObservable<Long> co = Observable.interval(10, 10, TimeUnit.MILLISECONDS, scheduler).take(3).publish();
co.connect();
// Emit 0
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
TestObserver<Long> to = new TestObserver<Long>();
co.subscribe(to);
// Emit 1 and 2
scheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS);
to.assertValues(1L, 2L);
to.assertNoErrors();
to.assertTerminated();
}
内容来源于网络,如有侵权,请联系作者删除!