gpt4 book ai didi

java - Rx-java 2 : How to set priority between Observables?

转载 作者:太空宇宙 更新时间:2023-11-04 09:51:53 25 4
gpt4 key购买 nike

我有一个基于时间的算法,在某些情况下,不同的可观测值可能在同一时刻发生两次发射。

请参阅下面的代码:

   int moment = ...; //calculated
Observable.timer(moment, unit)
.takeUntil(messages)
.map(e -> new Action())

messages Observable 在 Schedulers.computation() 上执行,就像计时器一样。

当消息与计时器同时到达时,行为是随机的。有时会发出新的 Action,有时则不会,takeUntil(messages) 优先。

我需要完全控制这种行为。当然,我可以在 moment 中添加几毫秒,这样它就总是在 messages 之后执行,但这不是我想要的。

如何告诉 RxJava 可观察的消息优先于计时器?

更新:

为了澄清问题,我添加了这个测试,每次执行它都会成功

    //before launching the test
RxJavaPlugins.setComputationSchedulerHandler(scheduler ->
new TestScheduler());


@Test
public void rx_precedence() {

int moment = 5;
Observable<Long> messages = Observable.timer(moment, TimeUnit.SECONDS);
TestObserver<Long> testObserver = Observable.timer(moment, TimeUnit.SECONDS)
.takeUntil(messages)
.test();

scheduler.advanceTimeTo(moment, TimeUnit.SECONDS);
testObserver.assertNoValues();
testObserver.assertTerminated();
}

在同一时刻,执行takeUntil并且不发出任何值。

但在服务器上,在现实世界中,当 je JVM 同时执行多项操作时,timer 可能优先于 takeUntil

我需要一种方法来使这种优先级在任何环境下都稳定,这样当messagestimer耗尽的同时发出一个值时,就不会发出任何值。

最佳答案

你需要Observable.amb()我想 - http://reactivex.io/documentation/operators/amb.html

更新:好的,现在问题更清楚了。也许 Scheduler.single() 会有所帮助。

关于java - Rx-java 2 : How to set priority between Observables?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54631376/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com