io.reactivex.Observable.retryWhen()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.4k)|赞(0)|评价(0)|浏览(204)

本文整理了Java中io.reactivex.Observable.retryWhen()方法的一些代码示例,展示了Observable.retryWhen()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.retryWhen()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:retryWhen

Observable.retryWhen介绍

[英]Returns an Observable that emits the same values as the source ObservableSource with the exception of an onError. An onError notification from the source will result in the emission of a Throwable item to the ObservableSource provided as an argument to the notificationHandlerfunction. If that ObservableSource calls onComplete or onError then retry will call onComplete or onError on the child subscription. Otherwise, this ObservableSource will resubscribe to the source ObservableSource.

Example: This retries 3 times, each time incrementing the number of seconds it waits.

Observable.create((ObservableEmitter<? super String> s) -> { 
System.out.println("subscribing"); 
s.onError(new RuntimeException("always fails")); 
}).retryWhen(attempts -> { 
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> { 
System.out.println("delay retry by " + i + " second(s)"); 
return Observable.timer(i, TimeUnit.SECONDS); 
}); 
}).blockingForEach(System.out::println);

Output is:

subscribing

Note that the inner ObservableSource returned by the handler function should signal either onNext, onError or onComplete in response to the received Throwable to indicate the operator should retry or terminate. If the upstream to the operator is asynchronous, signalling onNext followed by onComplete immediately may result in the sequence to be completed immediately. Similarly, if this inner ObservableSource signals onError or onComplete while the upstream is active, the sequence is terminated with the same signal immediately.

The following example demonstrates how to retry an asynchronous source with a delay:

Observable.timer(1, TimeUnit.SECONDS) 
.doOnSubscribe(s -> System.out.println("subscribing")) 
.map(v -> { throw new RuntimeException(); }) 
.retryWhen(errors -> { 
AtomicInteger counter = new AtomicInteger(); 
return errors 
.takeWhile(e -> counter.getAndIncrement() != 3) 
.flatMap(e -> { 
System.out.println("delay retry by " + counter.get() + " second(s)"); 
return Observable.timer(counter.get(), TimeUnit.SECONDS); 
}); 
}) 
.blockingSubscribe(System.out::println, System.out::println);

Scheduler: retryWhen does not operate by default on a particular Scheduler.
[中]返回与源ObservableSource发出相同值的Observable,但onError除外。源发出的一个错误通知将导致一个可丢弃的项目发送到作为NotificationHandler函数参数提供的ObservableSource。如果该ObservableSource调用onComplete或onError,则retry将调用子订阅上的onComplete或onError。否则,此ObservableSource将重新订阅源ObservableSource。
示例:重试3次,每次增加等待的秒数。

Observable.create((ObservableEmitter<? super String> s) -> { 
System.out.println("subscribing"); 
s.onError(new RuntimeException("always fails")); 
}).retryWhen(attempts -> { 
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> { 
System.out.println("delay retry by " + i + " second(s)"); 
return Observable.timer(i, TimeUnit.SECONDS); 
}); 
}).blockingForEach(System.out::println);

输出为:

subscribing

请注意,处理程序函数返回的内部ObservableSource应向onNext、onError或onComplete发送信号,以响应接收到的Throwable,指示操作员应重试或终止。如果操作员的上游是异步的,则发出onNext后紧接着onComplete的信号可能会导致序列立即完成。类似地,如果在上游处于活动状态时,该内部可观测源发出onError或onComplete信号,则序列立即以相同的信号终止。
以下示例演示如何延迟重试异步源:

Observable.timer(1, TimeUnit.SECONDS) 
.doOnSubscribe(s -> System.out.println("subscribing")) 
.map(v -> { throw new RuntimeException(); }) 
.retryWhen(errors -> { 
AtomicInteger counter = new AtomicInteger(); 
return errors 
.takeWhile(e -> counter.getAndIncrement() != 3) 
.flatMap(e -> { 
System.out.println("delay retry by " + counter.get() + " second(s)"); 
return Observable.timer(counter.get(), TimeUnit.SECONDS); 
}); 
}) 
.blockingSubscribe(System.out::println, System.out::println);

调度程序:retryWhen默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void retryWhenFunctionNull() {
  just1.retryWhen(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void managerThrows() {
    Observable.just(1)
    .retryWhen(new Function<Observable<Throwable>, ObservableSource<Object>>() {
      @Override
      public ObservableSource<Object> apply(Observable<Throwable> v) throws Exception {
        throw new TestException();
      }
    })
    .test()
    .assertFailure(TestException.class);
  }
}

代码示例来源:origin: rengwuxian/RxJavaSamples

.retryWhen(new Function<Observable<? extends Throwable>, Observable<?>>() {
  @Override
  public Observable<?> apply(Observable<? extends Throwable> observable) {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnErrorFromNotificationHandler() {
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> origin = Observable.unsafeCreate(new FuncWithErrors(2));
  origin.retryWhen(new Function<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<?> apply(Observable<? extends Throwable> t1) {
      return Observable.error(new RuntimeException());
    }
  }).subscribe(observer);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer).onSubscribe((Disposable)notNull());
  inOrder.verify(observer, never()).onNext("beginningEveryTime");
  inOrder.verify(observer, never()).onNext("onSuccessOnly");
  inOrder.verify(observer, never()).onComplete();
  inOrder.verify(observer, times(1)).onError(any(RuntimeException.class));
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnCompletedFromNotificationHandler() {
  Observer<String> observer = TestHelper.mockObserver();
  Observable<String> origin = Observable.unsafeCreate(new FuncWithErrors(1));
  TestObserver<String> to = new TestObserver<String>(observer);
  origin.retryWhen(new Function<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<?> apply(Observable<? extends Throwable> t1) {
      return Observable.empty();
    }
  }).subscribe(to);
  InOrder inOrder = inOrder(observer);
  inOrder.verify(observer).onSubscribe((Disposable)notNull());
  inOrder.verify(observer, never()).onNext("beginningEveryTime");
  inOrder.verify(observer, never()).onNext("onSuccessOnly");
  inOrder.verify(observer, times(1)).onComplete();
  inOrder.verify(observer, never()).onError(any(Exception.class));
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testOnNextFromNotificationHandler() {
  Observer<String> observer = TestHelper.mockObserver();
  int numRetries = 2;
  Observable<String> origin = Observable.unsafeCreate(new FuncWithErrors(numRetries));
  origin.retryWhen(new Function<Observable<? extends Throwable>, Observable<Object>>() {
    @Override
    public Observable<Object> apply(Observable<? extends Throwable> t1) {
      return t1.map(new Function<Throwable, Integer>() {
        @Override
        public Integer apply(Throwable t1) {
          return 0;
        }
      }).startWith(0).cast(Object.class);
    }
  }).subscribe(observer);
  InOrder inOrder = inOrder(observer);
  // should show 3 attempts
  inOrder.verify(observer, times(numRetries + 1)).onNext("beginningEveryTime");
  // should have no errors
  inOrder.verify(observer, never()).onError(any(Throwable.class));
  // should have a single success
  inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
  // should have a single successful onComplete
  inOrder.verify(observer, times(1)).onComplete();
  inOrder.verifyNoMoreInteractions();
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void retryWhenFunctionReturnsNull() {
  Observable.error(new TestException()).retryWhen(new Function<Observable<? extends Throwable>, Observable<Object>>() {
    @Override
    public Observable<Object> apply(Observable<? extends Throwable> f) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void shouldDisposeInnerObservable() {
 final PublishSubject<Object> subject = PublishSubject.create();
 final Disposable disposable = Observable.error(new RuntimeException("Leak"))
   .retryWhen(new Function<Observable<Throwable>, ObservableSource<Object>>() {
    @Override
    public ObservableSource<Object> apply(Observable<Throwable> errors) throws Exception {
      return errors.switchMap(new Function<Throwable, ObservableSource<Object>>() {
        @Override
        public ObservableSource<Object> apply(Throwable ignore) throws Exception {
          return subject;
        }
      });
    }
  })
   .subscribe();
 assertTrue(subject.hasObservers());
 disposable.dispose();
 assertFalse(subject.hasObservers());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testSingleSubscriptionOnFirst() throws Exception {
  final AtomicInteger inc = new AtomicInteger(0);
  ObservableSource<Integer> onSubscribe = new ObservableSource<Integer>() {
    @Override
    public void subscribe(Observer<? super Integer> observer) {
      observer.onSubscribe(Disposables.empty());
      final int emit = inc.incrementAndGet();
      observer.onNext(emit);
      observer.onComplete();
    }
  };
  int first = Observable.unsafeCreate(onSubscribe)
      .retryWhen(new Function<Observable<? extends Throwable>, Observable<?>>() {
        @Override
        public Observable<?> apply(Observable<? extends Throwable> attempt) {
          return attempt.zipWith(Observable.just(1), new BiFunction<Throwable, Integer, Void>() {
            @Override
            public Void apply(Throwable o, Integer integer) {
              return null;
            }
          });
        }
      })
      .blockingFirst();
  assertEquals("Observer did not receive the expected output", 1, first);
  assertEquals("Subscribe was not called once", 1, inc.get());
}

代码示例来源:origin: ReactiveX/RxJava

producer.retryWhen(new Function<Observable<? extends Throwable>, Observable<Object>>() {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void noCancelPreviousRepeatWhen2() {
  final AtomicInteger counter = new AtomicInteger();
  final AtomicInteger times = new AtomicInteger();
  Observable<Integer> source = Observable.<Integer>error(new TestException()).doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  });
  source.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
    @Override
    public ObservableSource<?> apply(Observable<Throwable> e) throws Exception {
      return e.takeWhile(new Predicate<Object>() {
        @Override
        public boolean test(Object v) throws Exception {
          return times.getAndIncrement() < 4;
        }
      });
    }
  })
  .test()
  .assertResult();
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

Observable<String> origin = Observable.unsafeCreate(new FuncWithErrors(numRetries));
TestObserver<String> to = new TestObserver<String>(observer);
origin.retryWhen(new Function<Observable<? extends Throwable>, Observable<Object>>() {
  @Override
  public Observable<Object> apply(Observable<? extends Throwable> t1) {

代码示例来源:origin: ReactiveX/RxJava

source.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
  @Override
  public ObservableSource<?> apply(Observable<Throwable> e) throws Exception {

代码示例来源:origin: Polidea/RxAndroidBle

writeOperationAckStrategy, byteBuffer, emitterWrapper
))
.retryWhen(errorIsRetryableAndAccordingTo(writeOperationRetryStrategy, byteBuffer, batchSize))
.subscribe(new Observer<ByteAssociation<UUID>>() {
  @Override

代码示例来源:origin: ReactiveX/RxJava

.retryWhen(new Function<Observable<Throwable>, ObservableSource<Integer>>() {
  @Override
  public ObservableSource<Integer> apply(Observable<Throwable> v)

代码示例来源:origin: Polidea/RxAndroidBle

.retryWhen(errorNotificationHandler -> errorNotificationHandler),
sharedNotifyButtonClicks.compose(onSubscribeSetText(notifyButton, R.string.setup_notification)),
sharedNotifyButtonClicks.compose(onSubscribeSetText(notifyButton, R.string.setting_notification)),

代码示例来源:origin: JessYanCoding/ArmsComponent

/**
 * 重试
 * @param <T>
 * @return 重试次数
 */
public static <T> ObservableTransformer<T, T> retry2() {
  return upstream -> upstream.retryWhen(new RetryWithDelay(2, 2));
}

代码示例来源:origin: JessYanCoding/ArmsComponent

.retryWhen(new RetryWithDelay(3, 2))//遇到错误时重试,第一个参数为重试几次,第二个参数为重试的间隔
.doOnSubscribe(disposable -> {
  if (pullToRefresh)

代码示例来源:origin: commonsguy/cw-androidarch

Observable<String> getPassphrase(Uri source, final int count) {
 return(getWordsFromSource(source)
  .map(strings -> (randomSubset(strings, count)))
  .map(pieces -> TextUtils.join(" ", pieces))
  .flatMap(checker::validate)
  .retryWhen(errors -> errors.retry(3)));
}

代码示例来源:origin: AriesHoo/FastLib

/**
   * 检查版本--是否传递本地App 版本相关信息根据具体接口而定(demo这里是可以不需要传的,所有判断逻辑放在app端--不推荐)
   *
   * @return
   */
  public Observable<UpdateEntity> updateApp() {
    Map<String, Object> params = new HashMap<>(2);
    params.put("versionCode", FastUtil.getVersionCode(App.getContext()));
    params.put("versionName", FastUtil.getVersionName(App.getContext()));
    return FastTransformer.switchSchedulers(getApiService().updateApp(params).retryWhen(new FastRetryWhen()));
  }
}

相关文章

Observable类方法