io.reactivex.Observable.repeatWhen()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.0k)|赞(0)|评价(0)|浏览(139)

本文整理了Java中io.reactivex.Observable.repeatWhen()方法的一些代码示例,展示了Observable.repeatWhen()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.repeatWhen()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:repeatWhen

Observable.repeatWhen介绍

[英]Returns an Observable that emits the same values as the source ObservableSource with the exception of an onComplete. An onComplete notification from the source will result in the emission of a void item to the ObservableSource provided as an argument to the notificationHandlerfunction. If that ObservableSource calls onComplete or onError then repeatWhen will call onComplete or onError on the child subscription. Otherwise, this ObservableSource will resubscribe to the source ObservableSource.

Scheduler: repeatWhen does not operate by default on a particular Scheduler.
[中]返回与源ObservableSource发出相同值的Observable,但onComplete除外。来自源的未完成通知将导致向作为notificationHandlerfunction参数提供的ObservableSource发出无效项。如果该ObservableSource调用onComplete或onError,那么repeatWhen将在子订阅上调用onComplete或onError。否则,此ObservableSource将重新订阅源ObservableSource。
调度程序:repeatWhen默认情况下不在特定调度程序上运行。

代码示例

代码示例来源:origin: Polidea/RxAndroidBle

/**
   * A convenience function creating a transformer that will repeat the source observable whenever it will complete
   *
   * @param <T> the type of the transformed observable
   * @return transformer that will emit observable that will never complete (source will be subscribed again)
   */
  @NonNull
  private static <T> ObservableTransformer<T, T> repeatAfterCompleted() {
    return observable -> observable.repeatWhen(completedNotification -> completedNotification);
  }
}

代码示例来源:origin: Polidea/RxAndroidBle

@Override
  public Observable<RxBleInternalScanResult> apply(final Observable<RxBleInternalScanResult> rxBleInternalScanResultObservable) {
    return rxBleInternalScanResultObservable.take(windowInMillis, TimeUnit.MILLISECONDS, scheduler)
        .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
          @Override
          public ObservableSource<?> apply(Observable<Object> observable) throws Exception {
            return observable.delay(delayToNextWindow, TimeUnit.MILLISECONDS, scheduler
            );
          }
        });
  }
};

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void repeatWhenNull() {
  just1.repeatWhen(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void repeatWhenFunctionReturnsNull() {
  just1.repeatWhen(new Function<Observable<Object>, Observable<Object>>() {
    @Override
    public Observable<Object> apply(Observable<Object> v) {
      return null;
    }
  }).blockingSubscribe();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void redoCancel() {
  final TestObserver<Integer> to = new TestObserver<Integer>();
  Observable.just(1)
  .repeatWhen(new Function<Observable<Object>, ObservableSource<Object>>() {
    @Override
    public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
      return o.map(new Function<Object, Object>() {
        int count;
        @Override
        public Object apply(Object v) throws Exception {
          if (++count == 1) {
            to.cancel();
          }
          return v;
        }
      });
    }
  })
  .subscribe(to);
}

代码示例来源:origin: Polidea/RxAndroidBle

.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
  @Override
  public ObservableSource<?> apply(Observable<Object> onWriteFinished) throws Exception {

代码示例来源:origin: ReactiveX/RxJava

@Test
public void noCancelPreviousRepeatWhen() {
  final AtomicInteger counter = new AtomicInteger();
  Observable<Integer> source = Observable.just(1).doOnDispose(new Action() {
    @Override
    public void run() throws Exception {
      counter.getAndIncrement();
    }
  });
  final AtomicInteger times = new AtomicInteger();
  source.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
    @Override
    public ObservableSource<?> apply(Observable<Object> e) throws Exception {
      return e.takeWhile(new Predicate<Object>() {
        @Override
        public boolean test(Object v) throws Exception {
          return times.getAndIncrement() < 4;
        }
      });
    }
  })
  .test()
  .assertResult(1, 1, 1, 1, 1);
  assertEquals(0, counter.get());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void shouldDisposeInnerObservable() {
 final PublishSubject<Object> subject = PublishSubject.create();
 final Disposable disposable = Observable.just("Leak")
   .repeatWhen(new Function<Observable<Object>, ObservableSource<Object>>() {
    @Override
    public ObservableSource<Object> apply(Observable<Object> completions) throws Exception {
      return completions.switchMap(new Function<Object, ObservableSource<Object>>() {
        @Override
        public ObservableSource<Object> apply(Object ignore) throws Exception {
          return subject;
        }
      });
    }
  })
   .subscribe();
 assertTrue(subject.hasObservers());
 disposable.dispose();
 assertFalse(subject.hasObservers());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void whenTake() {
  Observable.range(1, 3).repeatWhen(new Function<Observable<Object>, ObservableSource<Object>>() {
    @Override
    public ObservableSource<Object> apply(Observable<Object> handler) throws Exception {
      return handler.take(2);
    }
  })
  .test()
  .assertResult(1, 2, 3, 1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void handlerError() {
  Observable.range(1, 3)
  .repeatWhen(new Function<Observable<Object>, ObservableSource<Object>>() {
    @Override
    public ObservableSource<Object> apply(Observable<Object> v) throws Exception {
      return v.map(new Function<Object, Object>() {
        @Override
        public Object apply(Object w) throws Exception {
          throw new TestException();
        }
      });
    }
  })
  .test()
  .assertFailure(TestException.class, 1, 2, 3);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testRepeatWhen() {
  Observable.error(new TestException())
  .repeatWhen(new Function<Observable<Object>, ObservableSource<Object>>() {
    @Override
    public ObservableSource<Object> apply(Observable<Object> v) throws Exception {
      return v.delay(10, TimeUnit.SECONDS);
    }
  })
  .test()
  .awaitDone(5, TimeUnit.SECONDS)
  .assertFailure(TestException.class);
}

代码示例来源:origin: Polidea/RxAndroidBle

timeoutObservable
.repeatWhen(bufferIsNotEmptyAndOperationHasBeenAcknowledgedAndNotUnsubscribed(
    writeOperationAckStrategy, byteBuffer, emitterWrapper
))

代码示例来源:origin: ReactiveX/RxJava

.repeatWhen(new Function<Observable<Object>, ObservableSource<Integer>>() {
  @Override
  public ObservableSource<Integer> apply(Observable<Object> v)

代码示例来源:origin: imZeJun/RxSample

private void startAdvancePolling() {
  Log.d(TAG, "startAdvancePolling click");
  Observable<Long> observable = Observable.just(0L).doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      doWork();
    }
  }).repeatWhen(new Function<Observable<Object>, ObservableSource<Long>>() {
    private long mRepeatCount;
    @Override
    public ObservableSource<Long> apply(Observable<Object> objectObservable) throws Exception {
      //必须作出反应,这里是通过flatMap操作符。
      return objectObservable.flatMap(new Function<Object, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Object o) throws Exception {
          if (++mRepeatCount > 4) {
            //return Observable.empty(); //发送onComplete消息,无法触发下游的onComplete回调。
            return Observable.error(new Throwable("Polling work finished")); //发送onError消息,可以触发下游的onError回调。
          }
          Log.d(TAG, "startAdvancePolling apply");
          return Observable.timer(3000 + mRepeatCount * 1000, TimeUnit.MILLISECONDS);
        }
      });
    }
  });
  DisposableObserver<Long> disposableObserver = getDisposableObserver();
  observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
  mCompositeDisposable.add(disposableObserver);
}

代码示例来源:origin: Carson-Ho/RxJavaLearningMaterial

observable.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
  @Override

代码示例来源:origin: Carson-Ho/RxJavaLearningMaterial

Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
  @Override
  public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
  @Override
  public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {

代码示例来源:origin: akarnokd/akarnokd-misc

.repeatWhen(v -> v.zipWith(mayRepeat, (a, b) -> b))

相关文章

Observable类方法