io.reactivex.Observable.repeat()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(9.5k)|赞(0)|评价(0)|浏览(128)

本文整理了Java中io.reactivex.Observable.repeat()方法的一些代码示例,展示了Observable.repeat()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.repeat()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:repeat

Observable.repeat介绍

[英]Returns an Observable that repeats the sequence of items emitted by the source ObservableSource indefinitely.

Scheduler: repeat does not operate by default on a particular Scheduler.
[中]

代码示例

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns an Observable that repeats the sequence of items emitted by the source ObservableSource indefinitely.
 * <p>
 * <img width="640" height="287" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/repeatInf.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code repeat} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @return an Observable that emits the items emitted by the source ObservableSource repeatedly and in sequence
 * @see <a href="http://reactivex.io/documentation/operators/repeat.html">ReactiveX operators documentation: Repeat</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> repeat() {
  return repeat(Long.MAX_VALUE);
}

代码示例来源:origin: redisson/redisson

/**
 * Returns an Observable that repeats the sequence of items emitted by the source ObservableSource indefinitely.
 * <p>
 * <img width="640" height="287" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/repeatInf.o.png" alt="">
 * <dl>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code repeat} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @return an Observable that emits the items emitted by the source ObservableSource repeatedly and in sequence
 * @see <a href="http://reactivex.io/documentation/operators/repeat.html">ReactiveX operators documentation: Repeat</a>
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> repeat() {
  return repeat(Long.MAX_VALUE);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatZero() {
  Observer<Object> o = TestHelper.mockObserver();
  Observable.just(1).repeat(0).subscribe(o);
  verify(o).onComplete();
  verify(o, never()).onNext(any());
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatLimited() {
  Observer<Object> o = TestHelper.mockObserver();
  Observable.just(1).repeat(10).subscribe(o);
  verify(o, times(10)).onNext(1);
  verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatOne() {
  Observer<Object> o = TestHelper.mockObserver();
  Observable.just(1).repeat(1).subscribe(o);
  verify(o).onComplete();
  verify(o, times(1)).onNext(any());
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void repeatLongPredicateInvalid() {
  try {
    Observable.just(1).repeat(-99);
    fail("Should have thrown");
  } catch (IllegalArgumentException ex) {
    assertEquals("times >= 0 required but it was -99", ex.getMessage());
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatAndTake() {
  Observer<Object> o = TestHelper.mockObserver();
  Observable.just(1).repeat().take(10).subscribe(o);
  verify(o, times(10)).onNext(1);
  verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void bufferWithSizeTake1() {
  Observable<Integer> source = Observable.just(1).repeat();
  Observable<List<Integer>> result = source.buffer(2).take(1);
  Observer<Object> o = TestHelper.mockObserver();
  result.subscribe(o);
  verify(o).onNext(Arrays.asList(1, 1));
  verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void bufferWithSizeSkipTake1() {
  Observable<Integer> source = Observable.just(1).repeat();
  Observable<List<Integer>> result = source.buffer(2, 3).take(1);
  Observer<Object> o = TestHelper.mockObserver();
  result.subscribe(o);
  verify(o).onNext(Arrays.asList(1, 1));
  verify(o).onComplete();
  verify(o, never()).onError(any(Throwable.class));
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatError() {
  Observer<Object> o = TestHelper.mockObserver();
  Observable.error(new TestException()).repeat(10).subscribe(o);
  verify(o).onError(any(TestException.class));
  verify(o, never()).onNext(any());
  verify(o, never()).onComplete();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 20000)
public void testNoStackOverFlow() {
  Observable.just(1).repeat().subscribeOn(Schedulers.newThread()).take(100000).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepeatTake() {
  Observable<Integer> xs = Observable.just(1, 2);
  Object[] ys = xs.repeat().subscribeOn(Schedulers.newThread()).take(4).toList().blockingGet().toArray();
  assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void noCancelPreviousRepeat() {
  final AtomicInteger counter = new AtomicInteger();
  Observable<Integer> source = Observable.just(1).doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  });
  source.repeat(5)
  .test()
  .assertResult(1, 1, 1, 1, 1);
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void testRepetition() {
  int num = 10;
  final AtomicInteger count = new AtomicInteger();
  int value = Observable.unsafeCreate(new ObservableSource<Integer>() {
    @Override
    public void subscribe(final Observer<? super Integer> o) {
      o.onNext(count.incrementAndGet());
      o.onComplete();
    }
  }).repeat().subscribeOn(Schedulers.computation())
  .take(num).blockingLast();
  assertEquals(num, value);
}

代码示例来源:origin: ReactiveX/RxJava

/** Issue #2844: wrong target of request. */
@Test(timeout = 3000)
public void testRepeatRetarget() {
  final List<Integer> concatBase = new ArrayList<Integer>();
  TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.just(1, 2)
  .repeat(5)
  .concatMap(new Function<Integer, Observable<Integer>>() {
    @Override
    public Observable<Integer> apply(Integer x) {
      System.out.println("testRepeatRetarget -> " + x);
      concatBase.add(x);
      return Observable.<Integer>empty()
          .delay(200, TimeUnit.MILLISECONDS);
    }
  })
  .subscribe(to);
  to.awaitTerminalEvent();
  to.assertNoErrors();
  to.assertNoValues();
  assertEquals(Arrays.asList(1, 2, 1, 2, 1, 2, 1, 2, 1, 2), concatBase);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testRepeatTakeWithSubscribeOn() throws InterruptedException {
  final AtomicInteger counter = new AtomicInteger();
  Observable<Integer> oi = Observable.unsafeCreate(new ObservableSource<Integer>() {
    @Override
    public void subscribe(Observer<? super Integer> sub) {
      sub.onSubscribe(Disposables.empty());
      counter.incrementAndGet();
      sub.onNext(1);
      sub.onNext(2);
      sub.onComplete();
    }
  }).subscribeOn(Schedulers.newThread());
  Object[] ys = oi.repeat().subscribeOn(Schedulers.newThread()).map(new Function<Integer, Integer>() {
    @Override
    public Integer apply(Integer t1) {
      try {
        Thread.sleep(50);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return t1;
    }
  }).take(4).toList().blockingGet().toArray();
  assertEquals(2, counter.get());
  assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
}

代码示例来源:origin: ReactiveX/RxJava

/** Issue #2587. */
@Test
public void testRepeatAndDistinctUnbounded() {
  Observable<Integer> src = Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
      .take(3)
      .repeat(3)
      .distinct();
  TestObserver<Integer> to = new TestObserver<Integer>();
  src.subscribe(to);
  to.assertNoErrors();
  to.assertTerminated();
  to.assertValues(1, 2, 3);
}

代码示例来源:origin: diabolicallabs/vertx-cron

public static Observable<Timed<Long>> cronspec(Scheduler scheduler, String cronspec, String timeZoneName) {
 if (timeZoneName != null) {
  Boolean noneMatch = Arrays.stream(TimeZone.getAvailableIDs()).noneMatch(available -> available.equals(timeZoneName));
  if (noneMatch) throw new IllegalArgumentException("timeZoneName " + timeZoneName + " is invalid");
 }
 return Observable.just(cronspec)
  .flatMap(_cronspec -> {
   CronExpression cron;
   try {
    cron = new CronExpression(_cronspec);
    if (timeZoneName != null) {
     cron.setTimeZone(TimeZone.getTimeZone(timeZoneName));
    }
   } catch (ParseException e) {
    throw new IllegalArgumentException("Invalid cronspec " + _cronspec, e);
   }
   return Observable.just(cron)
    .map(cronExpression -> cronExpression.getNextValidTimeAfter(new Date(new Date().getTime() + 500)))
    .map(nextRunDate -> nextRunDate.getTime() - new Date().getTime())
    .flatMap(delay -> Observable.timer(delay, TimeUnit.MILLISECONDS, scheduler))
    .timestamp()
    .repeat();
  });
}

代码示例来源:origin: com.microsoft.azure.v2/azure-client-runtime

Single<HttpResponse> pollUntilDone() {
  return sendPollRequestWithDelay()
      .repeat()
      .takeUntil(new Predicate<HttpResponse>() {
        @Override
        public boolean test(HttpResponse ignored) {
          return isDone();
        }
      })
      .lastOrError();
}

代码示例来源:origin: com.microsoft.azure.v2/azure-client-runtime

Observable<OperationStatus<Object>> pollUntilDoneWithStatusUpdates(final HttpRequest originalHttpRequest, final SwaggerMethodParser methodParser, final Type operationStatusResultType) {
  return sendPollRequestWithDelay()
        .flatMap(new Function<HttpResponse, Observable<OperationStatus<Object>>>() {
          @Override
          public Observable<OperationStatus<Object>> apply(HttpResponse httpResponse) {
            return createOperationStatusObservable(originalHttpRequest, httpResponse, methodParser, operationStatusResultType);
          }
        })
        .repeat()
        .takeUntil(new Predicate<OperationStatus<Object>>() {
          @Override
          public boolean test(OperationStatus<Object> operationStatus) {
            return isDone();
          }
        });
}

相关文章

Observable类方法