我有一个API,它接受一个触发事件的Observable
。
我想返回一个Observable
,如果检测到Internet连接,它每隔defaultDelay
秒发送一个值,如果没有连接,它将延迟numberOfFailedAttempts^2
次。
我尝试了很多不同的风格,最大的问题是retryWhen's
observable只计算一次:
Observable
.interval(defaultDelay,TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.repeatWhen((observable) ->
observable.concatMap(repeatObservable -> {
if(internetConnectionDetector.isInternetConnected()){
consecutiveRetries = 0;
return observable;
} else {
consecutiveRetries++;
int backoffDelay = (int)Math.pow(consecutiveRetries,2);
return observable.delay(backoffDelay, TimeUnit.SECONDS);
}
}).onBackpressureDrop())
.onBackpressureDrop();
有什么方法可以完成我正在尝试的任务吗?我找到了一个相关的问题(现在搜索时找不到),但所采用的方法似乎不适用于动态值。
3条答案
按热度按时间mwngjboj1#
在您的代码中有两个错误:
1.为了重复一些可观察到的序列,该序列必须是有限的。也就是说,你最好使用类似
just
或fromCallable
的东西来代替interval
,就像我在下面的示例中所做的那样。1.从
repeatWhen
的内部函数,你需要返回新的延迟可观察源,所以你必须返回Observable.timer()
而不是observable.delay()
。工作代码:
请参阅有关
repeatWhen
here的详细文章。1u4esq0p2#
我一直觉得
retryWhen
有点低级,所以对于指数回退,我使用一个构建器(如Abhijit),它经过单元测试,可用于RxJava 1.x的rxjava-extras。我建议使用一个封顶版本,这样延迟的指数增长不会超过您定义的最大值。这是你如何使用它:
我不同意
retryWhen
有bug,但是如果你发现bug,报告给RxJava。bug很快就被修复了!您需要
rxjava-extras
0.8.0.6或更高版本,该版本位于Maven Central上:如果您需要RxJava2.x版本,请告诉我。从0.1.4开始,rxjava2-extras中提供了相同的功能。
41zrol4v3#
您可以使用
retryWhen
操作符来配置没有连接时的延迟。如何定期发射项目是一个单独的主题(查找interval
或timer
操作符)。如果您无法理解,请打开一个单独的问题。我有一个关于Github的广泛的例子,但我在这里给予你要点。
RetryWithDelay
的定义如下,我使用的是RxJava 2.x,所以如果你使用的是1.x,那么签名应该是Func1<Observable<? extends Throwable>, Observable<Object>>
。RetryWithDelay类。
RetryStrategy枚举
这允许我基于
RetryDelayStrategy
配置各种类型的超时,常量、线性、指数。对于您的用例,您可以选择CONSTANT_DELAY_TIMES_RETRY_COUNT
延迟策略,并在构建RetryWithDelay
时调用retryDelaySeconds(2)
。retryWhen
是一个复杂的,甚至可能是有缺陷的操作符。你可以在网上找到的大多数例子都使用range
操作符,如果没有重试,它将失败。详细信息请参见我的答案here。