gpt4 book ai didi

java - RxJava : How to retain Thread from upstream

转载 作者:搜寻专家 更新时间:2023-11-01 09:39:14 26 4
gpt4 key购买 nike

在 Android 上,某些回调始终按设计在主 Thread 上调用,例如以下 ServiceConnection:

private Completable dismissService() {
return Completable.fromEmitter(new Action1<CompletableEmitter>() {
@Override
public void call(final CompletableEmitter completableEmitter) {
final ServiceConnection conn = new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName name, IBinder service) {
// here always main Thread...
unbindService(this);
completableEmitter.onCompleted();
}

@Override
public void onServiceDisconnected(ComponentName name) {
// no-op
}
};

completableEmitter.setCancellation(new AsyncEmitter.Cancellable() {
@Override
public void cancel() throws Exception {
unbindService(conn);
}
});

bindService(new Intent(MainActivity.this, MyService.class), conn, BIND_AUTO_CREATE);
}
});
}

但是,我希望 dismissService() 返回的 Completable 在调用它的任何 Thread 上发出它的结果。我使用 newSingleThreadExecutor() 尝试了以下(hacky?)解决方案:

private Completable dismissServiceRetainingThread() {
return Single.fromCallable(new Callable<Thread>() {
@Override
public Thread call() throws Exception {
return Thread.currentThread();
}
}).flatMapCompletable(new Func1<Thread, Completable>() {
@Override
public Completable call(final Thread thread) {
return Completable.fromEmitter(new Action1<CompletableEmitter>() {
@Override
public void call(final CompletableEmitter completableEmitter) {
final ServiceConnection conn = new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName name, IBinder service) {
// here always main Thread...
unbindService(this);
completableEmitter.onCompleted();
}

@Override
public void onServiceDisconnected(ComponentName name) {
// no-op
}
};

completableEmitter.setCancellation(new AsyncEmitter.Cancellable() {
@Override
public void cancel() throws Exception {
unbindService(conn);
}
});

bindService(new Intent(MainActivity.this, MyService.class), conn, BIND_AUTO_CREATE);
}
}).observeOn(Schedulers.from(
Executors.newSingleThreadExecutor(
new ThreadFactory() {
@Override
public Thread newThread(@NonNull Runnable runnable) {
return thread;
}
}
)));
}
});
}

但是,这会因以下 IllegalThreadStateException 而崩溃:

E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.jenzz.rxjavathreadingtest, PID: 5307
java.lang.IllegalThreadStateException
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:930)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1348)
at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:591)
at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.schedule(ExecutorScheduler.java:79)
at rx.Completable$28$1.onCompleted(Completable.java:1805)
at rx.internal.operators.CompletableFromEmitter$FromEmitter.onCompleted(CompletableFromEmitter.java:73)
at com.jenzz.rxjavathreadingtest.MainActivity$5$2$1.onServiceConnected(MainActivity.java:117)

关于如何跳回到之前在上游使用的原始 Thread 的任何想法,例如使用 subscribeOn(Schedulers.io())?

最佳答案

您不想返回到您所在的确切线程。当同一个 Scheduler 中的其他线程空闲时,它可能很忙,除非它是主线程,否则无法保证它在回调返回时仍然存在。一旦您的 call 方法返回,它就可以用于更多工作或清理。我认为您想要的是在与原始调用相同的 Scheduler 上执行。 AFIAK,也没有直接的方法来确定线程的当前 Scheduler(有一种确定某些调度程序的脆弱方法,在本文底部提到)。所以你不能轻易地做你想做的事。默认在主线程上通知似乎是此方法的理智行为。如果您希望它默认为不同的 Scheduler,您可以添加 .observeOn(Schedulers.io()) 或您选择的调度程序。

如另一个答案所述,如果当前线程有一个循环程序,您可以尝试创建一个 Handler。您仍然会依赖调用者来确保回调发生时线程仍然可行。与告诉调用者响应将出现在主线程上相比,这似乎是一种更高级别的责任,除非他们使用 observesOn 选择不同的响应。

作为最后的想法,可以通过查看线程的名称来为 Schedulers 返回的标准调度程序确定正确的调度程序。它们具有可预测的前缀,例如 RxNewThreadScheduler-1,因此一些 String.startsWith() 调用可以隔离正确的调度。不过,这非常脆弱,因为它无法正确处理用户创建的调度程序,并且线程命名方案将来可能会发生变化。

关于java - RxJava : How to retain Thread from upstream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40900349/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com