gpt4 book ai didi

java - Reactor - 如何在不丢弃元素的情况下重试热通量?

转载 作者:行者123 更新时间:2023-11-30 10:00:53 24 4
gpt4 key购买 nike

我有无穷无尽的热流数据。我即将对流中的每个元素执行一个操作,每个元素返回一个 Mono,它将在有限的时间后完成(以某种方式)。

这些操作可能会引发错误。如果是这样,我想重新订阅热通量而不遗漏任何东西,重试抛出错误时正在处理的元素(即任何未成功完成的元素)。

我在这里做什么?我可以容忍对相同元素的重复操作,但不能完全从流中丢失元素。

我已经尝试使用 ReplayProcessor 来处理这个问题,但是我看不出有什么方法可以让它工作,除非重复很多很可能已经成功的操作(使用非常保守的超时),或者丢失元素新元素覆盖缓冲区中的旧元素(如下所示)。

测试用例:


@Test
public void fluxTest() {


List<String> strings = new ArrayList<>();
strings.add("one");
strings.add("two");
strings.add("three");
strings.add("four");

ConnectableFlux<String> flux = Flux.fromIterable(strings).publish();

//Goes boom after three uses of its method, otherwise
//returns a mono. completing after a little time
DangerousClass dangerousClass = new DangerousClass(3);

ReplayProcessor<String> replay = ReplayProcessor.create(3);

flux.subscribe(replay);

replay.flatMap(dangerousClass::doThis)
.retry(1)
.doOnNext(s -> LOG.info("Completed {}", s))
.subscribe();

flux.connect();

flux.blockLast();
}


public class DangerousClass {

Logger LOG = LoggerFactory.getLogger(DangerousClass.class);

private int boomCount;
private AtomicInteger count;

public DangerousClass(int boomCount) {
this.boomCount = boomCount;
this.count = new AtomicInteger(0);
}

public Mono<String> doThis(String s) {
return Mono.fromSupplier(() -> {
LOG.info("doing dangerous {}", s);
if (count.getAndIncrement() == boomCount) {
LOG.error("Throwing exception from {}", s);
throw new RuntimeException("Boom!");
}
return s;
}).delayElement(Duration.ofMillis(600));
}
}

这打印:

doing dangerous one
doing dangerous two
doing dangerous three
doing dangerous four
Throwing exception from four
doing dangerous two
doing dangerous three
doing dangerous four
Completed four
Completed two
Completed three

一个永远不会完成。

最佳答案

错误(至少在上面的示例中)只能发生在 flatMap(dangerousClass::doThis) 调用中 - 因此重新订阅根 Flux 并重放元素当这个 flatMap() 调用失败时似乎有点奇怪,并且(可能)不是您想要做的。

相反,我建议放弃 ReplayProcessor 并只在内部 flatMap() 调用上调用重试,这样你最终会得到如下结果:

ConnectableFlux<String> flux = Flux.range(1, 10).map(n -> "Entry " + n).publish();

DangerousClass dangerousClass = new DangerousClass(3);

flux.flatMap(x -> dangerousClass.doThis(x).retry(1))
.doOnNext(s -> System.out.println("Completed " + s))
.subscribe();

flux.connect();

这将为您提供如下内容,所有条目均已完成且无需重试:

doing dangerous Entry 1
doing dangerous Entry 2
doing dangerous Entry 3
doing dangerous Entry 4
Throwing exception from Entry 4
doing dangerous Entry 4
Completed Entry 2
Completed Entry 1
Completed Entry 3
Completed Entry 4

关于java - Reactor - 如何在不丢弃元素的情况下重试热通量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57796734/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com