gpt4 book ai didi

android - RxJava Subject with Backpressure - 只有在下游完成消费后才发出最后一个值

转载 作者:太空宇宙 更新时间:2023-11-03 12:46:51 24 4
gpt4 key购买 nike

我有一个 PublishSubject,它在某些 UI 事件上调用 onNext()。订阅者通常需要 2 秒来完成其工作。我需要在订阅者忙碌时忽略对 onNext() 的所有调用,但最后一个除外。我尝试了以下方法,但是我无法控制流程。请求似乎排队,每个请求都得到处理(因此背压似乎不起作用)。我怎样才能让它忽略除最后一个请求之外的所有请求? (我不想使用 debounce,因为代码需要立即使用react,任何合理的小超时都不会起作用)。

此外,我意识到对主题使用 subscribeOn 没有任何效果,因此我使用 observeOn 在其中一个运算符中执行异步工作。这是正确的做法吗?

Subject<Boolean> loadingQueue = PublishSubject.<Boolean>create().toSerialized();

loadingQueue
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(AndroidSchedulers.mainThread())
.map(discarded -> {
// PRE-LOADING
Log.d("RXLOADING", "PRE-LOADING: " + Thread.currentThread().getName());
return discarded;
})
.observeOn(Schedulers.computation())
.map(b -> {
Log.d("RXLOADING", "LOADING: " + Thread.currentThread().getName());
Thread.sleep(2000);
return b;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(b -> {
Log.d("RXLOADING", "FINISHED: " + Thread.currentThread().getName() + "\n\n");
});


loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);
....

我看到的输出是:

PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
LOADING: RxComputationThreadPool-1
FINISHED: main
FINISHED: main

相反,我希望代码执行以下操作(即加载一次,并且在加载时,背压以阻止所有请求并发出最后一个,一旦第一个观察者完成 - 所以总的来说它应该理想最多只加载两次):

PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main

PRE-LOADING: main
LOADING: RxComputationThreadPool-1
FINISHED: main

最佳答案

您不能使用 observeOn 执行此操作,因为它会缓冲至少 1 个元素,因此如果已经发生一个“加载”,则始终执行“预加载”阶段。

但是您可以使用 delay 来做到这一点,因为它不会操纵链上的请求数量,并且会在调度程序上单独安排每个 onNext,而无需自行排队:

public static void main(String[] args) throws Exception {
Subject<Boolean> loadingQueue =
PublishSubject.<Boolean>create().toSerialized();

loadingQueue
.toFlowable(BackpressureStrategy.LATEST)
.delay(0, TimeUnit.MILLISECONDS, Schedulers.single()) // <-------
.map(discarded -> {
// PRE-LOADING
System.out.println("PRE-LOADING: "
+ Thread.currentThread().getName());
return discarded;
})
.delay(0, TimeUnit.MILLISECONDS, Schedulers.computation()) // <-------
.map(b -> {
System.out.println("LOADING: "
+ Thread.currentThread().getName());
Thread.sleep(2000);
return b;
})
.delay(0, TimeUnit.MILLISECONDS, Schedulers.single()) // <-------
.rebatchRequests(1) // <----------------------------------- one-by-one
.subscribe(b -> {
System.out.println("FINISHED: "
+ Thread.currentThread().getName() + "\n\n");
});


loadingQueue.onNext(true);
loadingQueue.onNext(true);
loadingQueue.onNext(true);

Thread.sleep(10000);
}

关于android - RxJava Subject with Backpressure - 只有在下游完成消费后才发出最后一个值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41958423/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com