我有一个基于时间的算法,在某些情况下,不同的可观测值可能在同一时刻发生两次发射。
请参阅下面的代码:
int moment = ...; //calculated
Observable.timer(moment, unit)
.takeUntil(messages)
.map(e -> new Action())
messages
Observable 在 Schedulers.computation()
上执行,就像计时器一样。
当消息与计时器同时到达时,行为是随机的。有时会发出新的 Action
,有时则不会,takeUntil(messages)
优先。
我需要完全控制这种行为。当然,我可以在 moment
中添加几毫秒,这样它就总是在 messages
之后执行,但这不是我想要的。
如何告诉 RxJava 可观察的消息
优先于计时器?
更新:
为了澄清问题,我添加了这个测试,每次执行它都会成功
//before launching the test
RxJavaPlugins.setComputationSchedulerHandler(scheduler ->
new TestScheduler());
@Test
public void rx_precedence() {
int moment = 5;
Observable<Long> messages = Observable.timer(moment, TimeUnit.SECONDS);
TestObserver<Long> testObserver = Observable.timer(moment, TimeUnit.SECONDS)
.takeUntil(messages)
.test();
scheduler.advanceTimeTo(moment, TimeUnit.SECONDS);
testObserver.assertNoValues();
testObserver.assertTerminated();
}
在同一时刻
,执行takeUntil
并且不发出任何值。
但在服务器上,在现实世界中,当 je JVM 同时执行多项操作时,timer
可能优先于 takeUntil
。
我需要一种方法来使这种优先级在任何环境下都稳定,这样当messages
在timer
耗尽的同时发出一个值时,就不会发出任何值。
我是一名优秀的程序员,十分优秀!