- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在尝试实现一个计时器 Observable
,它在我的应用程序的 Activities
之间共享。我正在一个 Dagger 单例类上实现,我将其注入(inject)到每个不同 Activity
的每个 Presenter
中。
我以这种方式创建了一次 Observable:
Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> this::doSomethingCool()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.share();
我使用以下函数从 Presenter 订阅:
public Observable<Status> register(Callback callback) {
PublishSubject<Status> subject = PublishSubject.create();
subject.subscribe(status -> {},
throwable -> L.LOGE(TAG, throwable.getMessage()),
() -> callback.onStatusChanged(mBasketStatus));
mObservable.subscribe(subject);
basketCounterCallback.onStatusChanged(status));
subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
return subject.asObservable();
}
我将 Subject
存储为每个演示者的 Observable
,然后调用: obs.unsubscribeOn(AndroidSchedulers.mainThread())
取消订阅(在 onPause()
方法中)。我还尝试使用调度程序 Schedulers.immediate()
但回调无论如何都会被调用 X 次(其中 X 是我已订阅计时器的所有 Presenter),因此它不会取消订阅。此外,日志 “Unsubscribed from subject!”
没有被调用。
如何正确取消订阅每个主题?
提前致谢
编辑:
根据评论添加了更多实现细节:
这是我创建 Observable 并存储在 Singleton
类 StatusManager
的成员中的部分(状态也是单例):
private Observable<BasketStatus> mObservable;
private Status mStatus;
public Observable<BasketStatus> start(long milliseconds, Status status, Callback callback) {
if (mObservable == null) mObservable = createObservable(milliseconds, status);
return register(callback);
}
private Observable<BasketStatus> createObservable(long milliseconds, Status status) {
mStatus = status;
return Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> status.upgradeStatus()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.share();
}
public Observable<BasketStatus> register(Callback callback) {
PublishSubject<Status> subject = PublishSubject.create();
subject.subscribe(status -> {},
throwable -> L.LOGE(TAG, throwable.getMessage()),
() -> callback.onStatusChanged(mStatus));
mObservable.subscribe(subject);
callback.onStatusChanged(mStatus));
subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
return subject.asObservable();
}
从启动计时器的Presenter
调用方法start(...)
后,我调用register(...)
来自下一个 Presenters 的方法:
class Presenter implements Callback {
private Observable<BasketStatus> mRegister;
@Inject
public Presenter(Status status, StatusManager statusManager) {
mRegister = statusManager.start(20000, status, this);
}
// Method called from onPause()
public void unregisterFromBasketStatus() {
mRegister.unsubscribeOn(Schedulers.immediate());
}
}
还有下一位主持人...
@Inject
public NextPresenter(StatusManager statusManager) {
mBasketStatusManager.register(this);
}
最佳答案
正如我在评论中提到的,您没有得到 unsubscribeOn
运算符的行为。它不会取消订阅任何内容,而是告诉每个订阅者,它应该在何时取消订阅时在哪里工作。这也没有意义,因为您不是取消订阅 Observable,而是取消订阅 Subscription,它表示观察者与流本身之间的连接。
回到你的问题。你的代码现在的设计方式,你返回的主题是完全无用的。您正在订阅 register 方法中可观察到的计时器,但是您将这些通知转发给主题,此后任何人都不会订阅。如果你真的需要它们,例如在我们看不到的代码的某些部分,你需要存储 subscribe()
方法的结果,这是一个 Subscription
对象并在需要时对其调用 unsubscribe()
(我认为在 unregisterFromBasketStatus
中)。
但是代码中还有更多的问题。例如:
subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
return subject.asObservable();
在第一行中,您没有将该操作的结果存储在任何地方。由于 Observables 是不可变的结构,每个运算符(operator)都会创建一个新的流来接收来自前一个的通知。由此可见,当您在第二行返回 subject.asObservable()
时,您返回的不是第一行修改后的可观察对象,而是未修改的旧对象。要更正此问题,只需将 subject
变量替换为结果:subject = subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
其次,使用流的原因之一是完全取代回调。当然,它会起作用,但你正在减轻 Rx 给代码库带来的很多好处。然而,它的可读性要差得多,任何在你之后不得不处理你的代码的人都会把你诅咒到 hell ,以至于他可能真的成功了:-))我明白,Rx 从一开始就很难学习,但是试试尽可能避免这种情况。
关于android - 使用 RxJava 在 Activity 之间共享 Observable 并取消订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42085254/
一段时间后,我阅读了有关 RxJava concat 的内容,并决定测试一下我的理解力。但是我遇到了一些我不太理解的行为。 问题是,当我连接两个可观察对象时,根据我将它们传递给 Observable.
我正在使用来自数据库服务的数据实现自动完成: @Injectable() export class SchoolService { constructor(private db: AngularF
我正在尝试使用 RxJS 创建一个可观察的对象,它可以执行如图所示的操作。 获取一个值并等待一段固定的时间才能获得 下一个。 下一个将是该周期内发出的最后一个值 等等,跳过其余部分。 如果等待时间间隔
我有一个可观察对象和另一个提供的可观察对象改变 key 。我想构建一个在之间切换的可观察对象基于该键的对象中的可观察值。 示例: // Choose randomly between "up" or
我使用 protobuffers 在我的前端和我的 Dart 服务器之间进行通信。 那些对象没有实现 Observable . 我的 Dart 聚合物对象看起来像: @CustomTag('user-
在 java swing 项目中,我有一个模型类,它保存某个 JPanel 的状态。我需要使这些数据可供 View 使用。我认为有两种选择。有一个扩展 Observable 的类并将模型作为实例变量。
我想找到一种方法来检测观察者是否已完成使用我使用 Rx.Observable.create 创建的自定义可观察对象,以便自定义可观察对象可以结束它并正确地进行一些清理。 因此,我创建了一些测试代码,如
我正在尝试查询数据库。迭代结果列表,并为每一项再执行一个请求。在 rxjs 构建结束时,我有 Observable[]> 。但我需要Observable 。如何做到这一点? this.caseServ
我希望我的 api 上有一个方法返回 Observable> 但我希望该方法中的代码知道所有包含的 Observables 是否已完成,以便它可以关闭某些内容。最好的方法是什么? 更明确地说,我希望完
我有两个方法返回 Observable> firstAPI.getFirstInfo("1", "2"); secondApi.getSecondInfo(resultFirstObservable,
我有一个 Observable返回单个 Cursor实例(Observable)。我正在尝试利用 ContentObservable.fromCursor获取 onNext 中每个游标的行回调。 我想
我有两种返回 Observable 的方法: Observable firstObservable(); Observable secondObservable(String value); 对于第一
我正在尝试创建一个将用户数据作为 Observable 的函数,并使用来自第一个 observable 的数据从查询中添加/合并数据,然后将所有这些数据作为一个 observable 返回,我可以这样
我有一个 spec-compliant ECMAScript Observable ,具体来自 wonka library .我正在尝试将这种类型的 observable 转换为 rxjs 6 obs
为了简化问题,我在这里使用了数字和字符串。代码: const numbers$:Observable = of([1,2,3]); const strings: string[] = ["a","b"
对于我的 Android 应用程序,我需要一个 Observable 来聚合来自 7 个不同搜索的结果并作为一个集合发出。 对于最终发射,我选择了 ListMultimap其中 Content是搜索结
我正在使用改造 2.0.0-beta2 并且调试构建工作正常,但我在使用 Proguard 发布构建时遇到以下错误。 这是更新后的 logcat 错误。 11-17 18:23:22.751 1627
observer.throw(err) 和 observer.error(err) 有什么区别? 我正在使用 RxJS 版本“5.0.0-beta.12” var innerObservable =
我们有一种情况,对服务的方法调用返回一个 IObservable但我们的客户期望 IObservable .将 T1 转换为 T2 很简单。 Rx 中有什么允许这样做的吗? (即链接观察者) 我知道我
我陷入了如何将以下可观察类型转换/转换为我的目标类型的困境: 我有可观察的类型: Observable>> 我想将其转换为: Observable> 所以当我订阅它时,它会发出 List不是Obser
我是一名优秀的程序员,十分优秀!