作者热门文章
- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我有一个 API,它采用触发事件的 Observable
。
我想返回一个 Observable
,如果检测到 Internet 连接,它每 defaultDelay
秒发出一个值,并延迟 numberOfFailedAttempts^2
次如果没有连接。
我尝试了很多不同的样式,我遇到的最大问题是retryWhen's
observable is only evaluated once:
Observable
.interval(defaultDelay,TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.repeatWhen((observable) ->
observable.concatMap(repeatObservable -> {
if(internetConnectionDetector.isInternetConnected()){
consecutiveRetries = 0;
return observable;
} else {
consecutiveRetries++;
int backoffDelay = (int)Math.pow(consecutiveRetries,2);
return observable.delay(backoffDelay, TimeUnit.SECONDS);
}
}).onBackpressureDrop())
.onBackpressureDrop();
有什么方法可以做我想做的事吗?我发现了一个相关问题(现在无法搜索),但所采用的方法似乎不适用于动态值。
最佳答案
在你的代码中有两个错误:
interval
你最好使用像 just
或 fromCallable
之类的东西,就像我在下面的示例中所做的那样。repeatWhen
的内部函数中,您需要返回新的延迟可观察源,因此您必须返回 Observable.timer 而不是 observable.delay()
()
。工作代码:
public void testRepeat() throws InterruptedException {
logger.info("test start");
int DEFAULT_DELAY = 100; // ms
int ADDITIONAL_DELAY = 100; // ms
AtomicInteger generator = new AtomicInteger(0);
AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive
Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
.repeatWhen(counts -> {
AtomicInteger retryCounter = new AtomicInteger(0);
return counts.flatMap(c -> {
int retry = 0;
if (connectionAlive.get()) {
retryCounter.set(0); // reset counter
} else {
retry = retryCounter.incrementAndGet();
}
int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
});
})
.subscribe(v -> logger.info("got {}", v));
Thread.sleep(220);
logger.info("connection dropped");
connectionAlive.set(false);
Thread.sleep(2000);
logger.info("connection is back alive");
connectionAlive.set(true);
Thread.sleep(2000);
subscription.dispose();
logger.info("test complete");
}
请参阅有关repeatWhen
的详细文章 here .
关于java - RxJava 中的指数退避,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41678918/
我试图了解 Spring WebClient Retry.backoff 方法的指数退避策略的默认乘数。并且这个可以配置吗?我找不到这方面的文档。 https://projectreactor.io/
我是一名优秀的程序员,十分优秀!