gpt4 book ai didi

java - RxJava Flowable.create(),如何尊重subscribeOn()线程

转载 作者:太空狗 更新时间:2023-10-29 13:46:10 25 4
gpt4 key购买 nike

我正在将自定义库 (dataClient) 回调 api 包装到 RxJava Flowable。 dataClient 使用自己的线程,因此它的回调在自己的线程上调用。

在我的 Rx 链中,我尝试使用 .subscribeOn(Schedulers.computation()) 指定计算调度程序。尽管如此,当我在我的 Rx 链上打印线程名称时,我得到了我的 dataClient 线程。

我应该怎么做,才能使我的 Flowable 使用 .subscribeOn() 中指定的线程?

Flowable.create({ emitter ->
dataClient.setCallback(object : Callback {
override fun message(message: DataModel) {
emitter.onNext(vehicle)
}

override fun done() {
emitter.onComplete()
}
})
emitter.setCancellable {
dataClient.setCallback(null)
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.computation())
.doOnNext { Log.e("DATA", Thread.currentThread().name) }
.observeOn(AndroidSchedulers.mainThread())
.subscribe { data -> Log.d("DATA", "Got data" + data.id)) }

最佳答案

subscribeOn 调度程序确保订阅在相关线程上完成。订阅恰好发生,它的处理方式与 observeOn 调度程序不同,后者在新线程上安排元素的发射。

Flowable.create({ emitter ->
// this runs with the computation scheduler
dataClient.setCallback(object : Callback {
override fun message(message: DataModel) {
// this runs on the thread it's called from
emitter.onNext(vehicle)
}

override fun done() {
// this runs on the thread it's called from
emitter.onComplete()
}
})
emitter.setCancellable {
dataClient.setCallback(null)
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.computation())
.doOnNext {
// this runs on the thread of the onNext call
Log.e("DATA", Thread.currentThread().name)
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// this runs on the main thread
data -> Log.d("DATA", "Got data" + data.id))
}

由于您的订阅代码不是阻塞的,也不维护发射线程,因此设置 subscribeOn 不是必需的,可以省略。它主要只对同步源有效。

关于java - RxJava Flowable.create(),如何尊重subscribeOn()线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53957833/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com