我正在实现一个从 Resource
发出行的可观察对象.
问题是这个资源真的不喜欢从创建它的不同线程关闭(它会杀死一只小狗并在发生这种情况时抛出异常)。
当我处理订阅时,资源 Cancellable
/Disposable
从 main
调用线程,而 observable 已在 Schedulers.io()
上订阅.
这是 Kotlin 代码:
fun lines(): Observable<String> =
Observable.create { emitter ->
val resource = NetworkResource()
emitter.setCancellable {
resource.close() // <-- main thread :(
}
try {
while (!emitter.isDisposed)
emitter.onNext(resource.readLine()) // <-- blocked here!
} catch (ioe: IOException) {
emitter.tryOnError(ioe) // <-- this also triggers the cancellable
}
}
val disposable = lines()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG, "Line: $it" }
disposable.dispose() // <-- main thread :)
问题:是否可以调用Cancellable
来自正确的*线程,考虑到订阅线程在 resource.readLine()
上被阻塞?
*正确的线程意思是来自subscribeOn(Schedures.io())
的线程.
编辑:恐怕这个问题没有正确答案,除非resource.close()
在 resource.dataReady
上进行线程安全或某种轮询实现后线程不会被阻塞。
Schedulers.io()
管理一个线程池,因此它可能会也可能不会使用相同的线程来处理您的资源。您将必须使用自定义调度程序和 unsubscribeOn()
运算符来确保您的 Observable
在同一线程上订阅和取消订阅。像这样的东西:
Scheduler customScheduler = Schedulers.from(Executors.newSingleThreadExecutor());
val disposable = lines()
.unsubscribeOn(customScheduler)
.subscribeOn(customScheduler)
.observeOn(AndroidSchedulers.mainThread())
.subscribe { Log.i(TAG, "Line: $it" }
我是一名优秀的程序员,十分优秀!