RxJava中的指数回退

nwlqm0z1  于 2023-01-15  发布在  Java
关注(0)|答案(3)|浏览(162)

我有一个API,它接受一个触发事件的Observable
我想返回一个Observable,如果检测到Internet连接,它每隔defaultDelay秒发送一个值,如果没有连接,它将延迟numberOfFailedAttempts^2次。
我尝试了很多不同的风格,最大的问题是retryWhen's observable只计算一次:

Observable
    .interval(defaultDelay,TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.io())
    .repeatWhen((observable) ->
         observable.concatMap(repeatObservable -> {
             if(internetConnectionDetector.isInternetConnected()){
                 consecutiveRetries = 0;
                 return observable;
             } else {
                 consecutiveRetries++;
                 int backoffDelay = (int)Math.pow(consecutiveRetries,2);
                 return observable.delay(backoffDelay, TimeUnit.SECONDS);
                }
         }).onBackpressureDrop())
    .onBackpressureDrop();

有什么方法可以完成我正在尝试的任务吗?我找到了一个相关的问题(现在搜索时找不到),但所采用的方法似乎不适用于动态值。

mwngjboj

mwngjboj1#

在您的代码中有两个错误:
1.为了重复一些可观察到的序列,该序列必须是有限的。也就是说,你最好使用类似justfromCallable的东西来代替interval,就像我在下面的示例中所做的那样。
1.从repeatWhen的内部函数,你需要返回新的延迟可观察源,所以你必须返回Observable.timer()而不是observable.delay()
工作代码:

public void testRepeat() throws InterruptedException {
    logger.info("test start");

    int DEFAULT_DELAY = 100; // ms
    int ADDITIONAL_DELAY = 100; // ms
    AtomicInteger generator = new AtomicInteger(0);
    AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive

    Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
            .repeatWhen(counts -> {
                AtomicInteger retryCounter = new AtomicInteger(0);
                return counts.flatMap(c -> {
                    int retry = 0;
                    if (connectionAlive.get()) {
                        retryCounter.set(0); // reset counter
                    } else {
                        retry = retryCounter.incrementAndGet();
                    }
                    int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
                    logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
                    return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
                });
            })
            .subscribe(v -> logger.info("got {}", v));

    Thread.sleep(220);
    logger.info("connection dropped");
    connectionAlive.set(false);
    Thread.sleep(2000);
    logger.info("connection is back alive");
    connectionAlive.set(true);
    Thread.sleep(2000);
    subscription.dispose();
    logger.info("test complete");
}

请参阅有关repeatWhenhere的详细文章。

1u4esq0p

1u4esq0p2#

我一直觉得retryWhen有点低级,所以对于指数回退,我使用一个构建器(如Abhijit),它经过单元测试,可用于RxJava 1.x的rxjava-extras。我建议使用一个封顶版本,这样延迟的指数增长不会超过您定义的最大值。
这是你如何使用它:

observable.retryWhen(
    RetryWhen.exponentialBackoff(
        delay, maxDelay, TimeUNIT.SECONDS)
    .build());

我不同意retryWhen有bug,但是如果你发现bug,报告给RxJava。bug很快就被修复了!
您需要rxjava-extras0.8.0.6或更高版本,该版本位于Maven Central上:

<dependency>
    <groupId>com.github.davidmoten</groupId>
    <artifactId>rxjava-extras</artifactId>
    <version>0.8.0.6</version>
</dependency>

如果您需要RxJava2.x版本,请告诉我。从0.1.4开始,rxjava2-extras中提供了相同的功能。

41zrol4v

41zrol4v3#

您可以使用retryWhen操作符来配置没有连接时的延迟。如何定期发射项目是一个单独的主题(查找intervaltimer操作符)。如果您无法理解,请打开一个单独的问题。
我有一个关于Github的广泛的例子,但我在这里给予你要点。

RetryWithDelay retryWithDelay = RetryWithDelay.builder()
    .retryDelayStrategy(RetryDelayStrategy.RETRY_COUNT)
    .build()

Single.fromCallable(() -> {
    ...
}).retryWhen(retryWithDelay)
.subscribe(j -> {
    ...
})

RetryWithDelay的定义如下,我使用的是RxJava 2.x,所以如果你使用的是1.x,那么签名应该是Func1<Observable<? extends Throwable>, Observable<Object>>

public class RetryWithDelay implements
        Function<Flowable<? extends Throwable>, Publisher<Object>> {
    ...
}

RetryWithDelay类。
RetryStrategy枚举
这允许我基于RetryDelayStrategy配置各种类型的超时,常量、线性、指数。对于您的用例,您可以选择CONSTANT_DELAY_TIMES_RETRY_COUNT延迟策略,并在构建RetryWithDelay时调用retryDelaySeconds(2)
retryWhen是一个复杂的,甚至可能是有缺陷的操作符。你可以在网上找到的大多数例子都使用range操作符,如果没有重试,它将失败。详细信息请参见我的答案here

相关问题