- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有无穷无尽的热流数据。我即将对流中的每个元素执行一个操作,每个元素返回一个 Mono,它将在有限的时间后完成(以某种方式)。
这些操作可能会引发错误。如果是这样,我想重新订阅热通量而不遗漏任何东西,重试抛出错误时正在处理的元素(即任何未成功完成的元素)。
我在这里做什么?我可以容忍对相同元素的重复操作,但不能完全从流中丢失元素。
我已经尝试使用 ReplayProcessor 来处理这个问题,但是我看不出有什么方法可以让它工作,除非重复很多很可能已经成功的操作(使用非常保守的超时),或者丢失元素新元素覆盖缓冲区中的旧元素(如下所示)。
测试用例:
@Test
public void fluxTest() {
List<String> strings = new ArrayList<>();
strings.add("one");
strings.add("two");
strings.add("three");
strings.add("four");
ConnectableFlux<String> flux = Flux.fromIterable(strings).publish();
//Goes boom after three uses of its method, otherwise
//returns a mono. completing after a little time
DangerousClass dangerousClass = new DangerousClass(3);
ReplayProcessor<String> replay = ReplayProcessor.create(3);
flux.subscribe(replay);
replay.flatMap(dangerousClass::doThis)
.retry(1)
.doOnNext(s -> LOG.info("Completed {}", s))
.subscribe();
flux.connect();
flux.blockLast();
}
public class DangerousClass {
Logger LOG = LoggerFactory.getLogger(DangerousClass.class);
private int boomCount;
private AtomicInteger count;
public DangerousClass(int boomCount) {
this.boomCount = boomCount;
this.count = new AtomicInteger(0);
}
public Mono<String> doThis(String s) {
return Mono.fromSupplier(() -> {
LOG.info("doing dangerous {}", s);
if (count.getAndIncrement() == boomCount) {
LOG.error("Throwing exception from {}", s);
throw new RuntimeException("Boom!");
}
return s;
}).delayElement(Duration.ofMillis(600));
}
}
这打印:
doing dangerous one
doing dangerous two
doing dangerous three
doing dangerous four
Throwing exception from four
doing dangerous two
doing dangerous three
doing dangerous four
Completed four
Completed two
Completed three
一个永远不会完成。
最佳答案
错误(至少在上面的示例中)只能发生在 flatMap(dangerousClass::doThis)
调用中 - 因此重新订阅根 Flux
并重放元素当这个 flatMap()
调用失败时似乎有点奇怪,并且(可能)不是您想要做的。
相反,我建议放弃 ReplayProcessor
并只在内部 flatMap()
调用上调用重试,这样你最终会得到如下结果:
ConnectableFlux<String> flux = Flux.range(1, 10).map(n -> "Entry " + n).publish();
DangerousClass dangerousClass = new DangerousClass(3);
flux.flatMap(x -> dangerousClass.doThis(x).retry(1))
.doOnNext(s -> System.out.println("Completed " + s))
.subscribe();
flux.connect();
这将为您提供如下内容,所有条目均已完成且无需重试:
doing dangerous Entry 1
doing dangerous Entry 2
doing dangerous Entry 3
doing dangerous Entry 4
Throwing exception from Entry 4
doing dangerous Entry 4
Completed Entry 2
Completed Entry 1
Completed Entry 3
Completed Entry 4
关于java - Reactor - 如何在不丢弃元素的情况下重试热通量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57796734/
我是 Java 新手,这是我的代码, if( a.name == b.name && a.displayname == b.displayname && a.linknam
在下面的场景中,我有一个 bool 值。根据结果,我调用完全相同的函数,唯一的区别是参数的数量。 var myBoolean = ... if (myBoolean) { retrieve
我是一名研究 C++ 的 C 开发人员: 我是否正确理解如果我抛出异常然后堆栈将展开直到找到第一个异常处理程序?是否可以在不展开的情况下在任何 throw 上打开调试器(即不离开声明它的范围或任何更高
在修复庞大代码库中的错误时,我观察到一个奇怪的情况,其中引用的动态类型从原始 Derived 类型更改为 Base 类型!我提供了最少的代码来解释问题: struct Base { // some
我正在尝试用 C# 扩展给定的代码,但由于缺乏编程经验,我有点陷入困境。 使用 Visual Studio 社区,我尝试通过控制台读出 CPU 核心温度。该代码使用开关/外壳来查找传感器的特定名称(即
这可能是一个哲学问题。 假设您正在向页面发出 AJAX 请求(这是使用 Prototype): new Ajax.Request('target.asp', { method:"post", pa
我有以下 HTML 代码,我无法在所有浏览器中正常工作: 我试图在移动到
我对 Swift 很陌生。我如何从 addPin 函数中检索注释并能够在我的 addLocation 操作 (buttonPressed) 中使用它。我正在尝试使用压力触摸在 map 上添加图钉,在两
我设置了一个详细 View ,我是否有几个 Nib 文件根据在 Root View Controller 的表中选择的项目来加载。 我发现,对于 Nibs 的类,永远不会调用 viewDidUnloa
我需要动态访问 json 文件并使用以下代码。在本例中,“bpicsel”和“temp”是变量。最终结果类似于“data[0].extit1” var title="data["+bpicsel+"]
我需要使用第三方 WCF 服务。我已经在我的证书存储中配置了所需的证书,但是在调用 WCF 服务时出现以下异常。 向 https://XXXX.com/AHSharedServices/Custome
在几个 SO 答案(1、2)中,建议如果存在冲突则不应触发 INSERT 触发器,ON CONFLICT DO NOTHING 在触发语句中。也许我理解错了,但在我的实验中似乎并非如此。 这是我的 S
如果进行修改,则会给出org.hibernate.NonUniqueObjectException。在我的 BidderBO 类(class)中 @Override @Transactional(pr
我使用 indexOf() 方法来精细地查找数组中的对象。 直到此刻我查了一些资料,发现代码应该无法正常工作。 我在reducer中尝试了上面的代码,它成功了 let tmp = state.find
假设我有以下表格: CREATE TABLE Game ( GameID INT UNSIGNED NOT NULL, GameType TINYINT UNSIGNED NOT NU
代码: Alamofire.request(URL(string: imageUrl)!).downloadProgress(closure: { (progress) in
我是一名优秀的程序员,十分优秀!