gpt4 book ai didi

java - RxJava retryWhen 异常行为

转载 作者:搜寻专家 更新时间:2023-10-31 20:32:39 24 4
gpt4 key购买 nike

我正在玩 RxJava retryWhen运算符(operator)。在互联网上很少找到关于它的信息,唯一值得一提的是 this .这也不足以探索我想了解的各种用例。我还加入了异步执行并通过回退重试,以使其更加真实。

我的设置很简单:我有一个类 ChuckNorrisJokesRepository,它从 JSON 文件返回随机数的 Chuck Norris 笑话。我的测试类是 ChuckNorrisJokesService,如下所示。我感兴趣的用例如下:

  1. 第一次尝试成功(不重试)
  2. 1 次重试后失败
  3. 尝试重试 3 次但在第 2 次成功,因此不会重试第 3 次
  4. 第 3 次重试成功

注意:项目在我的GitHub上可用.

ChuckNorrisJokesService.java:

@Slf4j
@Builder
public class ChuckNorrisJokesService {
@Getter
private final AtomicReference<Jokes> jokes = new AtomicReference<>(new Jokes());

private final Scheduler scheduler;
private final ChuckNorrisJokesRepository jokesRepository;
private final CountDownLatch latch;
private final int numRetries;
private final Map<String, List<String>> threads;

public static class ChuckNorrisJokesServiceBuilder {
public ChuckNorrisJokesService build() {
if (scheduler == null) {
scheduler = Schedulers.io();
}

if (jokesRepository == null) {
jokesRepository = new ChuckNorrisJokesRepository();
}

if (threads == null) {
threads = new ConcurrentHashMap<>();
}

requireNonNull(latch, "CountDownLatch must not be null.");

return new ChuckNorrisJokesService(scheduler, jokesRepository, latch, numRetries, threads);
}
}

public void setRandomJokes(int numJokes) {
mergeThreadNames("getRandomJokes");

Observable.fromCallable(() -> {
log.debug("fromCallable - before call. Latch: {}.", latch.getCount());
mergeThreadNames("fromCallable");
latch.countDown();

List<Joke> randomJokes = jokesRepository.getRandomJokes(numJokes);
log.debug("fromCallable - after call. Latch: {}.", latch.getCount());

return randomJokes;
}).retryWhen(errors ->
errors.zipWith(Observable.range(1, numRetries), (n, i) -> i).flatMap(retryCount -> {
log.debug("retryWhen. retryCount: {}.", retryCount);
mergeThreadNames("retryWhen");

return Observable.timer(retryCount, TimeUnit.SECONDS);
}))
.subscribeOn(scheduler)
.subscribe(j -> {
log.debug("onNext. Latch: {}.", latch.getCount());
mergeThreadNames("onNext");

jokes.set(new Jokes("success", j));
latch.countDown();
},
ex -> {
log.error("onError. Latch: {}.", latch.getCount(), ex);
mergeThreadNames("onError");
},
() -> {
log.debug("onCompleted. Latch: {}.", latch.getCount());
mergeThreadNames("onCompleted");

latch.countDown();
}
);
}

private void mergeThreadNames(String methodName) {
threads.merge(methodName,
new ArrayList<>(Arrays.asList(Thread.currentThread().getName())),
(value, newValue) -> {
value.addAll(newValue);

return value;
});
}
}

为简洁起见,我将只展示第一个用例的 Spock 测试用例。看我的GitHub对于其他测试用例。

def "succeeds on 1st attempt"() {
setup:
CountDownLatch latch = new CountDownLatch(2)
Map<String, List<String>> threads = Mock(Map)
ChuckNorrisJokesService service = ChuckNorrisJokesService.builder()
.latch(latch)
.threads(threads)
.build()

when:
service.setRandomJokes(3)
latch.await(2, TimeUnit.SECONDS)

Jokes jokes = service.jokes.get()

then:
jokes.status == 'success'
jokes.count() == 3

1 * threads.merge('getRandomJokes', *_)
1 * threads.merge('fromCallable', *_)
0 * threads.merge('retryWhen', *_)
1 * threads.merge('onNext', *_)
0 * threads.merge('onError', *_)
1 * threads.merge('onCompleted', *_)
}

这失败了:

Too few invocations for:

1 * threads.merge('fromCallable', *_) (0 invocations)
1 * threads.merge('onNext', *_) (0 invocations)

我期望的是 fromCallable 被调用一次,它成功,onNext 被调用一次,然后是 onCompleted。我错过了什么?

P.S.:全面披露 - 我还在 RxJava GitHub 上发布了这个问题.

最佳答案

经过几个小时的故障排除并在 ReactiveX 成员 David Karnok 的帮助下,我解决了这个问题。

retryWhen 是一个复杂的,甚至可能有错误的运算符。官方文档和此处至少有一个答案使用 range 运算符,如果没有重试,它会立即完成。看我的discussion与 David Karnok。

代码可以在我的GitHub 上找到完成以下测试用例:

  1. 第一次尝试成功(不重试)
  2. 1 次重试后失败
  3. 尝试重试 3 次但在第 2 次成功,因此不会重试第 3 次
  4. 第 3 次重试成功

关于java - RxJava retryWhen 异常行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38416861/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com