gpt4 book ai didi

rx-java - RxJava - 与 switchMap() 运算符相反?

转载 作者:行者123 更新时间:2023-12-04 07:51:26 25 4
gpt4 key购买 nike

我想知道是否有一种方法可以组合现有的运算符来执行与 switchMap() 相反的操作.
switchMap()将追踪它收到的最新发射并取消任何 Observable它以前正在执行。假设我翻转了它,我想忽略所有进入 xxxMap() 的排放。当它忙于接收到的第一次发射时。它将继续忽略排放,直到它完成排放当前 Observable在它里面。然后它将处理它收到的下一个发射。

Observable.interval(1, TimeUnit.SECONDS)
.doOnNext(i -> System.out.println("Source Emitted Value: " + i))
.ignoreWhileBusyMap(i -> doIntensiveProcess(i).subcribeOn(Schedulers.computation()))
.subscribe(i -> System.out.println("Subscriber received Value: " + i));

有没有办法做到这一点?在上面的例子中,如果 intensiveProcess()持续三秒, ignoreWhileBusyMap()将处理 0但可能会忽略排放 12来自 interval() 。然后它会处理 3但可能会忽略 45 , 等等...

最佳答案

当然,通过在处理完成后设置的 bool 值对值的处理进行门控:

AtomicBoolean gate = new AtomicBoolean(true);

Observable.interval(200, TimeUnit.MILLISECONDS)
.flatMap(v -> {
if (gate.get()) {
gate.set(false);

return Observable.just(v).delay(500, TimeUnit.MILLISECONDS)
.doAfterTerminate(() -> gate.set(true));
} else {
return Observable.empty();
}
})
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);

编辑

选择:
Observable.interval(200, TimeUnit.MILLISECONDS)
.onBackpressureDrop()
.flatMap(v -> {
return Observable.just(v).delay(500, TimeUnit.MILLISECONDS);
}, 1)
.take(10)
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace);

您可以更改 onBackpressureDroponBackpressureLatest立即继续使用最新值。

关于rx-java - RxJava - 与 switchMap() 运算符相反?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37171525/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com