gpt4 book ai didi

android - 使用 RxJava 在 Activity 之间共享 Observable 并取消订阅

转载 作者:太空宇宙 更新时间:2023-11-03 13:12:37 30 4
gpt4 key购买 nike

我正在尝试实现一个计时器 Observable,它在我的应用程序的 Activities 之间共享。我正在一个 Dagger 单例类上实现,我将其注入(inject)到每个不同 Activity 的每个 Presenter 中。

我以这种方式创建了一次 Observable:

Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> this::doSomethingCool()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.share();

我使用以下函数从 Presenter 订阅:

public Observable<Status> register(Callback callback) {

PublishSubject<Status> subject = PublishSubject.create();
subject.subscribe(status -> {},
throwable -> L.LOGE(TAG, throwable.getMessage()),
() -> callback.onStatusChanged(mBasketStatus));

mObservable.subscribe(subject);
basketCounterCallback.onStatusChanged(status));

subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
return subject.asObservable();
}

我将 Subject 存储为每个演示者的 Observable,然后调用: obs.unsubscribeOn(AndroidSchedulers.mainThread())取消订阅(在 onPause() 方法中)。我还尝试使用调度程序 Schedulers.immediate()

取消订阅

但回调无论如何都会被调用 X 次(其中 X 是我已订阅计时器的所有 Presenter),因此它不会取消订阅。此外,日志 “Unsubscribed from subject!” 没有被调用。

如何正确取消订阅每个主题?

提前致谢

编辑:

根据评论添加了更多实现细节:

这是我创建 Observable 并存储在 SingletonStatusManager 的成员中的部分(状态也是单例):

private Observable<BasketStatus> mObservable;
private Status mStatus;

public Observable<BasketStatus> start(long milliseconds, Status status, Callback callback) {

if (mObservable == null) mObservable = createObservable(milliseconds, status);

return register(callback);
}

private Observable<BasketStatus> createObservable(long milliseconds, Status status) {

mStatus = status;

return Observable.defer(() -> Observable.timer(milliseconds, TimeUnit.MILLISECONDS).map(t -> status.upgradeStatus()))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.share();
}

public Observable<BasketStatus> register(Callback callback) {

PublishSubject<Status> subject = PublishSubject.create();
subject.subscribe(status -> {},
throwable -> L.LOGE(TAG, throwable.getMessage()),
() -> callback.onStatusChanged(mStatus));

mObservable.subscribe(subject);
callback.onStatusChanged(mStatus));

subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
return subject.asObservable();
}

从启动计时器的Presenter 调用方法start(...) 后,我调用register(...) 来自下一个 Presenters 的方法:

class Presenter implements Callback {

private Observable<BasketStatus> mRegister;

@Inject
public Presenter(Status status, StatusManager statusManager) {
mRegister = statusManager.start(20000, status, this);
}

// Method called from onPause()
public void unregisterFromBasketStatus() {
mRegister.unsubscribeOn(Schedulers.immediate());
}
}

还有下一位主持人...

@Inject
public NextPresenter(StatusManager statusManager) {
mBasketStatusManager.register(this);
}

最佳答案

正如我在评论中提到的,您没有得到 unsubscribeOn 运算符的行为。它不会取消订阅任何内容,而是告诉每个订阅者,它应该在何时取消订阅时在哪里工作。这也没有意义,因为您不是取消订阅 Observable,而是取消订阅 Subscription,它表示观察者与流本身之间的连接。

回到你的问题。你的代码现在的设计方式,你返回的主题是完全无用的。您正在订阅 register 方法中可观察到的计时器,但是您将这些通知转发给主题,此后任何人都不会订阅。如果你真的需要它们,例如在我们看不到的代码的某些部分,你需要存储 subscribe() 方法的结果,这是一个 Subscription对象并在需要时对其调用 unsubscribe()(我认为在 unregisterFromBasketStatus 中)。

但是代码中还有更多的问题。例如:

subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));
return subject.asObservable();

在第一行中,您没有将该操作的结果存储在任何地方。由于 Observables 是不可变的结构,每个运算符(operator)都会创建一个新的流来接收来自前一个的通知。由此可见,当您在第二行返回 subject.asObservable() 时,您返回的不是第一行修改后的可观察对象,而是未修改的旧对象。要更正此问题,只需将 subject 变量替换为结果:subject = subject.doOnUnsubscribe(() -> L.LOGD(TAG, "Unsubcribed from subject!"));

其次,使用流的原因之一是完全取代回调。当然,它会起作用,但你正在减轻 Rx 给代码库带来的很多好处。然而,它的可读性要差得多,任何在你之后不得不处理你的代码的人都会把你诅咒到 hell ,以至于他可能真的成功了:-))我明白,Rx 从一开始就很难学习,但是试试尽可能避免这种情况。

关于android - 使用 RxJava 在 Activity 之间共享 Observable 并取消订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42085254/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com