gpt4 book ai didi

java - RxJava : retrying map actions

转载 作者:行者123 更新时间:2023-12-01 11:29:28 25 4
gpt4 key购买 nike

我有一个 Observable,其中每个项目都以可能导致异常的方式进行转换,但可以重试。我不希望失败破坏流,因为每个项目都代表一个独立的事务。我能想到的最佳解决方案是:

    final AtomicLong errCount = new AtomicLong();
Observable.from(ImmutableList.of(1L, 2L, 3L)).flatMap(new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.from(ImmutableList.of(aLong)).map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
if (aLong == 2 && errCount.getAndIncrement() < 1) {
throw new RuntimeException("retryable error");
}
return aLong * 100;
}
}).retry(2);
}
}).forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}
});

// Desired output: 100, 200, 300 (not 100, 100, 200, 300)

问题:

  • 重试逻辑非常冗长。
  • 如果任何项目在 2 次重试后失败,则流将中断(不再处理更多项目)。我想要一种干净的方式来返回异常和结果,例如 Finagle 的 Try,这样我就可以处理所有异常。

最佳答案

The retry logic is really verbose.

您可以避免使用ImmutableList完全通过切换到 Observable.just(t1, t2, t3)构造函数。这本质上是做同样的事情,但更简洁。

我看到你正在使用 flatMapping 来将每个值转换为 Observable。这将防止在映射单个值时遇到 onError 取消订阅整个链。因此,当运算符抛出异常时,它将仅取消订阅该值的内部可观察链。否则,错误将导致从主要外部可观察对象取消订阅并重新订阅。

如果您想保持这种行为但减少样板(除了明显切换到 Java8 lambda 之外),我可以想到两种选择。

首先,在重试后重新订阅并删除重复数据您的数据。如果你有一个很好的值hashcodeequals实现中,您可以使用过滤器附加到有状态集,并且仅当该集尚未包含该值时才使用 onNext。

Observable.<Long> just(1L, 2L, 3L)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
if (aLong == 2 && errCount.getAndIncrement() < 1) {
throw new RuntimeException("retryable error");
}
return aLong * 100;
}})
.retry(2)
.filter(new Func1<Long, Boolean>() {
Set<Long> state = null;

@Override
public Boolean call(Long a) {
if (state == null)
state = new HashSet<Long>();
if (!state.contains(a)) {
state.add(a);
return true;
}
return false;
}})
.forEach(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println(aLong);
}});

其次,您可以将您的可观察对象重新订阅时从上次停止的位置恢复。请注意,当使用缓冲运算符(observeOn、merge、flatMap)时,可能会出现数据丢失的问题。这是因为他们将继续以与下游消费者脱钩的方式从生产者那里消费。因此,您需要确保在重试之前不进行缓冲。如果您正在实现支持背压的可观察源,还需要考虑其他因素。

// Should resume right where it left off
resumableObservable.map(...).retry(2).observeOn()

// Don't do this. ObserveOn will buffer values and resume will lose data.
resumableObservable.map(...).observeOn().retry(2)

// Also bad if running async observables. Merging buffers so this could have data loss.
Observable.merge(resumableObservable.map(...)).retry(2)

If any item fails after 2 retries, the stream is broken (no more items are processed). I'd like a clean way to return both exceptions and results like Finagle's Try, so I can process all the exceptions.

您可以从 Long -> Long 更改不可靠的 map 到 Long -> Tuple<Long, List<Exception>> 。由于这是一大堆泛型并且很快就会变得很麻烦,我建议使用重试运算符的不同变体,即 retryWhen(Func1<Observable<Throwable>, Observable<?>>) 。以下是如何在代码中使用它的示例。

}).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>(){
@Override
public Observable<?> call(Observable<? extends Throwable> o) {
final AtomicInteger count = new AtomicInteger();
return o.filter(new Func1<Throwable, Boolean>() {
@Override
public Boolean call(Throwable t) {
return t instanceof RuntimeException || count.getAndIncrement() < 5;
}}).delay(1, TimeUnit.SECONDS, Schedulers.immediate());
}})

使用 retryWhen 的好处是,您可以轻松地以非阻塞方式在一段时间后实现延迟重试。

关于java - RxJava : retrying map actions,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30533082/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com