本文整理了Java中io.reactivex.Observable.repeat()
方法的一些代码示例,展示了Observable.repeat()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.repeat()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:repeat
[英]Returns an Observable that repeats the sequence of items emitted by the source ObservableSource indefinitely.
Scheduler: repeat does not operate by default on a particular Scheduler.
[中]
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns an Observable that repeats the sequence of items emitted by the source ObservableSource indefinitely.
* <p>
* <img width="640" height="287" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/repeatInf.o.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return an Observable that emits the items emitted by the source ObservableSource repeatedly and in sequence
* @see <a href="http://reactivex.io/documentation/operators/repeat.html">ReactiveX operators documentation: Repeat</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> repeat() {
return repeat(Long.MAX_VALUE);
}
代码示例来源:origin: redisson/redisson
/**
* Returns an Observable that repeats the sequence of items emitted by the source ObservableSource indefinitely.
* <p>
* <img width="640" height="287" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/repeatInf.o.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code repeat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return an Observable that emits the items emitted by the source ObservableSource repeatedly and in sequence
* @see <a href="http://reactivex.io/documentation/operators/repeat.html">ReactiveX operators documentation: Repeat</a>
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> repeat() {
return repeat(Long.MAX_VALUE);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void testRepeatZero() {
Observer<Object> o = TestHelper.mockObserver();
Observable.just(1).repeat(0).subscribe(o);
verify(o).onComplete();
verify(o, never()).onNext(any());
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void testRepeatLimited() {
Observer<Object> o = TestHelper.mockObserver();
Observable.just(1).repeat(10).subscribe(o);
verify(o, times(10)).onNext(1);
verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void testRepeatOne() {
Observer<Object> o = TestHelper.mockObserver();
Observable.just(1).repeat(1).subscribe(o);
verify(o).onComplete();
verify(o, times(1)).onNext(any());
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void repeatLongPredicateInvalid() {
try {
Observable.just(1).repeat(-99);
fail("Should have thrown");
} catch (IllegalArgumentException ex) {
assertEquals("times >= 0 required but it was -99", ex.getMessage());
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void testRepeatAndTake() {
Observer<Object> o = TestHelper.mockObserver();
Observable.just(1).repeat().take(10).subscribe(o);
verify(o, times(10)).onNext(1);
verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void bufferWithSizeTake1() {
Observable<Integer> source = Observable.just(1).repeat();
Observable<List<Integer>> result = source.buffer(2).take(1);
Observer<Object> o = TestHelper.mockObserver();
result.subscribe(o);
verify(o).onNext(Arrays.asList(1, 1));
verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void bufferWithSizeSkipTake1() {
Observable<Integer> source = Observable.just(1).repeat();
Observable<List<Integer>> result = source.buffer(2, 3).take(1);
Observer<Object> o = TestHelper.mockObserver();
result.subscribe(o);
verify(o).onNext(Arrays.asList(1, 1));
verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void testRepeatError() {
Observer<Object> o = TestHelper.mockObserver();
Observable.error(new TestException()).repeat(10).subscribe(o);
verify(o).onError(any(TestException.class));
verify(o, never()).onNext(any());
verify(o, never()).onComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 20000)
public void testNoStackOverFlow() {
Observable.just(1).repeat().subscribeOn(Schedulers.newThread()).take(100000).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void testRepeatTake() {
Observable<Integer> xs = Observable.just(1, 2);
Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void noCancelPreviousRepeat() {
final AtomicInteger counter = new AtomicInteger();
Observable<Integer> source = Observable.just(1).doOnDispose(new Action() {
@Override
public void run() throws Exception {
counter.getAndIncrement();
}
});
source.repeat(5)
.test()
.assertResult(1, 1, 1, 1, 1);
assertEquals(0, counter.get());
}
代码示例来源:origin: ReactiveX/RxJava
@Test(timeout = 2000)
public void testRepetition() {
int num = 10;
final AtomicInteger count = new AtomicInteger();
int value = Observable.unsafeCreate(new ObservableSource<Integer>() {
@Override
public void subscribe(final Observer<? super Integer> o) {
o.onNext(count.incrementAndGet());
o.onComplete();
}
}).repeat().subscribeOn(Schedulers.computation())
.take(num).blockingLast();
assertEquals(num, value);
}
代码示例来源:origin: ReactiveX/RxJava
/** Issue #2844: wrong target of request. */
@Test(timeout = 3000)
public void testRepeatRetarget() {
final List<Integer> concatBase = new ArrayList<Integer>();
TestObserver<Integer> to = new TestObserver<Integer>();
Observable.just(1, 2)
.repeat(5)
.concatMap(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer x) {
System.out.println("testRepeatRetarget -> " + x);
concatBase.add(x);
return Observable.<Integer>empty()
.delay(200, TimeUnit.MILLISECONDS);
}
})
.subscribe(to);
to.awaitTerminalEvent();
to.assertNoErrors();
to.assertNoValues();
assertEquals(Arrays.asList(1, 2, 1, 2, 1, 2, 1, 2, 1, 2), concatBase);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testRepeatTakeWithSubscribeOn() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
Observable<Integer> oi = Observable.unsafeCreate(new ObservableSource<Integer>() {
@Override
public void subscribe(Observer<? super Integer> sub) {
sub.onSubscribe(Disposables.empty());
counter.incrementAndGet();
sub.onNext(1);
sub.onNext(2);
sub.onComplete();
}
}).subscribeOn(Schedulers.newThread());
Object[] ys = oi.repeat().subscribeOn(Schedulers.newThread()).map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer t1) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
return t1;
}
}).take(4).toList().blockingGet().toArray();
assertEquals(2, counter.get());
assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}
代码示例来源:origin: ReactiveX/RxJava
/** Issue #2587. */
@Test
public void testRepeatAndDistinctUnbounded() {
Observable<Integer> src = Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
.take(3)
.repeat(3)
.distinct();
TestObserver<Integer> to = new TestObserver<Integer>();
src.subscribe(to);
to.assertNoErrors();
to.assertTerminated();
to.assertValues(1, 2, 3);
}
代码示例来源:origin: diabolicallabs/vertx-cron
public static Observable<Timed<Long>> cronspec(Scheduler scheduler, String cronspec, String timeZoneName) {
if (timeZoneName != null) {
Boolean noneMatch = Arrays.stream(TimeZone.getAvailableIDs()).noneMatch(available -> available.equals(timeZoneName));
if (noneMatch) throw new IllegalArgumentException("timeZoneName " + timeZoneName + " is invalid");
}
return Observable.just(cronspec)
.flatMap(_cronspec -> {
CronExpression cron;
try {
cron = new CronExpression(_cronspec);
if (timeZoneName != null) {
cron.setTimeZone(TimeZone.getTimeZone(timeZoneName));
}
} catch (ParseException e) {
throw new IllegalArgumentException("Invalid cronspec " + _cronspec, e);
}
return Observable.just(cron)
.map(cronExpression -> cronExpression.getNextValidTimeAfter(new Date(new Date().getTime() + 500)))
.map(nextRunDate -> nextRunDate.getTime() - new Date().getTime())
.flatMap(delay -> Observable.timer(delay, TimeUnit.MILLISECONDS, scheduler))
.timestamp()
.repeat();
});
}
代码示例来源:origin: com.microsoft.azure.v2/azure-client-runtime
Single<HttpResponse> pollUntilDone() {
return sendPollRequestWithDelay()
.repeat()
.takeUntil(new Predicate<HttpResponse>() {
@Override
public boolean test(HttpResponse ignored) {
return isDone();
}
})
.lastOrError();
}
代码示例来源:origin: com.microsoft.azure.v2/azure-client-runtime
Observable<OperationStatus<Object>> pollUntilDoneWithStatusUpdates(final HttpRequest originalHttpRequest, final SwaggerMethodParser methodParser, final Type operationStatusResultType) {
return sendPollRequestWithDelay()
.flatMap(new Function<HttpResponse, Observable<OperationStatus<Object>>>() {
@Override
public Observable<OperationStatus<Object>> apply(HttpResponse httpResponse) {
return createOperationStatusObservable(originalHttpRequest, httpResponse, methodParser, operationStatusResultType);
}
})
.repeat()
.takeUntil(new Predicate<OperationStatus<Object>>() {
@Override
public boolean test(OperationStatus<Object> operationStatus) {
return isDone();
}
});
}
内容来源于网络,如有侵权,请联系作者删除!