gpt4 book ai didi

rx-java - 超时后,ReactiveX 发出空值或哨兵值

转载 作者:行者123 更新时间:2023-12-04 15:20:03 25 4
gpt4 key购买 nike

寻找一种干净的方式来转换源 Observable发出单个 null (或哨兵值)在一段时间内不发射项目后。

例如,如果源 observable 发出 1, 2, 3然后停止发射 10 秒,然后发射 4, 5, 6我希望发出的项目是 1, 2, 3, null, 4, 5, 6 .

用例用于在 UI 中显示值,其中显示的值应变成短划线 -N/A如果最后发出的值是陈旧的/旧的。

我查看了 timeout运算符,但它终止了 Observable当超时发生时,这是不可取的。

使用 RxJava。

最佳答案

基于 akarnokd's answeran answer in a similar question ,另一种实现:

单个哨兵值(根据 OP)

如果您正在寻找一个值来指示排放之间的时间间隔:

final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

final long duration = 100;
final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
.concatWith(Observable.never())
.takeUntil(subject)
.repeat();

subject.mergeWith(timeout).subscribe(subscriber);

subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));

连续哨兵值

如果您希望在源 observable 在一段时间内不发射后继续接收值:

final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

final long duration = 100;
final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
.map(x -> -1)
.takeUntil(subject)
.repeat();

subject.mergeWith(timeout).subscribe(subscriber);

subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));

不同之处在于 timeout可观察的以及它是否重复出现。

您可以更换 -1null如所须。

以上所有内容均使用 RxJava 1.0.17 进行测试,使用 Java 1.8.0_72 .

关于rx-java - 超时后,ReactiveX 发出空值或哨兵值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35873244/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com