gpt4 book ai didi

java - RxJava 中的指数退避

转载 作者:塔克拉玛干 更新时间:2023-11-01 22:22:13 25 4
gpt4 key购买 nike

我有一个 API,它采用触发事件的 Observable

我想返回一个 Observable,如果检测到 Internet 连接,它每 defaultDelay 秒发出一个值,并延迟 numberOfFailedAttempts^2 次如果没有连接。

我尝试了很多不同的样式,我遇到的最大问题是retryWhen's observable is only evaluated once:

Observable
.interval(defaultDelay,TimeUnit.MILLISECONDS)
.observeOn(Schedulers.io())
.repeatWhen((observable) ->
observable.concatMap(repeatObservable -> {
if(internetConnectionDetector.isInternetConnected()){
consecutiveRetries = 0;
return observable;
} else {
consecutiveRetries++;
int backoffDelay = (int)Math.pow(consecutiveRetries,2);
return observable.delay(backoffDelay, TimeUnit.SECONDS);
}
}).onBackpressureDrop())
.onBackpressureDrop();

有什么方法可以做我想做的事吗?我发现了一个相关问题(现在无法搜索),但所采用的方法似乎不适用于动态值。

最佳答案

在你的代码中有两个错误:

  1. 为了重复某些可观察的序列,该序列必须是有限的。 IE。而不是 interval 你最好使用像 justfromCallable 之类的东西,就像我在下面的示例中所做的那样。
  2. repeatWhen 的内部函数中,您需要返回新的延迟可观察源,因此您必须返回 Observable.timer 而不是 observable.delay() ()

工作代码:

public void testRepeat() throws InterruptedException {
logger.info("test start");

int DEFAULT_DELAY = 100; // ms
int ADDITIONAL_DELAY = 100; // ms
AtomicInteger generator = new AtomicInteger(0);
AtomicBoolean connectionAlive = new AtomicBoolean(true); // initially alive

Disposable subscription = Observable.fromCallable(generator::incrementAndGet)
.repeatWhen(counts -> {
AtomicInteger retryCounter = new AtomicInteger(0);
return counts.flatMap(c -> {
int retry = 0;
if (connectionAlive.get()) {
retryCounter.set(0); // reset counter
} else {
retry = retryCounter.incrementAndGet();
}
int additionalDelay = ADDITIONAL_DELAY * (int) Math.pow(retry, 2);
logger.info("retry={}, additionalDelay={}ms", retry, additionalDelay);
return Observable.timer(DEFAULT_DELAY + additionalDelay, TimeUnit.MILLISECONDS);
});
})
.subscribe(v -> logger.info("got {}", v));

Thread.sleep(220);
logger.info("connection dropped");
connectionAlive.set(false);
Thread.sleep(2000);
logger.info("connection is back alive");
connectionAlive.set(true);
Thread.sleep(2000);
subscription.dispose();
logger.info("test complete");
}

请参阅有关repeatWhen 的详细文章 here .

关于java - RxJava 中的指数退避,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41678918/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com