gpt4 book ai didi

android - 从正确的线程调用 RxJava2 可取消/一次性

转载 作者:太空宇宙 更新时间:2023-11-03 13:43:27 27 4
gpt4 key购买 nike

我正在实现一个从 Resource 发出行的可观察对象.

问题是这个资源真的不喜欢从创建它的不同线程关闭(它会杀死一只小狗并在发生这种情况时抛出异常)。

当我处理订阅时,资源 Cancellable/Disposablemain 调用线程,而 observable 已在 Schedulers.io() 上订阅.

这是 Kotlin 代码:

fun lines(): Observable<String> =
Observable.create { emitter ->
val resource = NetworkResource()
emitter.setCancellable {
resource.close() // <-- main thread :(
}
try {
while (!emitter.isDisposed)
emitter.onNext(resource.readLine()) // <-- blocked here!
} catch (ioe: IOException) {
emitter.tryOnError(ioe) // <-- this also triggers the cancellable
}
}

val disposable = lines()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG, "Line: $it" }

disposable.dispose() // <-- main thread :)

问题:是否可以调用Cancellable来自正确的*线程,考虑到订阅线程在 resource.readLine() 上被阻塞?

*正确的线程意思是来自subscribeOn(Schedures.io())的线程.

编辑:恐怕这个问题没有正确答案,除非resource.close()resource.dataReady 上进行线程安全或某种轮询实现后线程不会被阻塞。

最佳答案

Schedulers.io() 管理一个线程池,因此它可能会也可能不会使用相同的线程来处理您的资源。您将必须使用自定义调度程序和 unsubscribeOn() 运算符来确保您的 Observable 在同一线程上订阅和取消订阅。像这样的东西:

Scheduler customScheduler = Schedulers.from(Executors.newSingleThreadExecutor());

val disposable = lines()
.unsubscribeOn(customScheduler)
.subscribeOn(customScheduler)
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG, "Line: $it" }

关于android - 从正确的线程调用 RxJava2 可取消/一次性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46820107/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com