gpt4 book ai didi

android - 从远程 API 重用 RxJava 流

转载 作者:搜寻专家 更新时间:2023-11-01 08:45:02 24 4
gpt4 key购买 nike

我有一个 API 调用,我想用 Observable 包装它:

private Observable<RealmResults<Account>> getAccounts() {
final Observable<RealmResults<Account>> realmAccounts =
Observable.defer(new Func0<Observable<RealmResults<Account>>>() {
@Override
public Observable<RealmResults<Account>> call() {
return RealmObservable.results(getActivity(), new Func1<Realm, RealmResults<Account>>() {
@Override
public RealmResults<Account> call(Realm realm) {
return realm.where(Account.class).findAll();
}
});
}
});

return Observable
.create(new Observable.OnSubscribe<RealmResults<Account>>() {
@Override
public void call(final Subscriber<? super RealmResults<Account>> subscriber) {
DataBridge.getAccounts(Preferences.getString(Constant.ME_GUID, ""), new OnResponseListener() {
@Override
public void OnSuccess(Object data) {
Log.d("Stream", "onSuccess");
realmAccounts.subscribe(subscriber);
}

@Override
public void onFailure(Object data) {
subscriber.onError(new Exception(data.toString()));
}
});
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.startWith(realmAccounts);
}

我喜欢用它

Observable<Accounts> accounts = getAccounts().flatMap(
new Func1<RealmResults<Account>, Observable<Account>>() {
@Override
public Observable<Account> call(RealmResults<Account> accounts) {
return Observable.from(accounts);
}
});

如何在不每次都调用 API 的情况下多次使用 accounts observable。我需要处理帐户流并从中提取不同的数据集。

最佳答案

最简单的方法是使用运算符 cache ,内部使用 ReplaySubject .它缓存源可观察项,然后从缓存中提供结果。

... 
Observable<<RealmResults<Account>> cachedResult = getAccounts().cache();
Observable<Accounts> accountsObservable = cachedResult.flatMap(...);
Observable<X> xObservable = cachedResult.flatMap(...);

如果您想避免缓存结果,您应该使用 Connectable Observables。通常它只对 Hot Observables 有意义。 Connectable observable 在其 Connect 方法被调用之前不会开始发射项目。您可以使用 publish运算符转换为 Connectable Observable。

ConnectableObservable<<RealmResults<Account>> connectebleObservable = getAccounts().publish();
Observable<Accounts> accountsObservable = connectebleObservable .flatMap(...);
Observable<X> xObservable = connectebleObservable .flatMap(...);
//You must subscribe before connect
accountsObservable.subsribe(...);
xObservable.subscribe(...);
//start emiting data
connectebleObservable.connect();

这里重要的一点是你必须在连接之前订阅 - 以避免数据丢失 - 否则你必须使用重放操作符,它类似于缓存操作符,但用于可连接的可观察

那么分享呢?

它创建 ConnectableObservable 并将其作为常规 Observable 公开。首次订阅自动导致连接和发射。

在您的案例中使用的共享,如果不重播,可能会导致数据丢失或多次执行,具体取决于时间。例如,对于 2 个订阅者和流中的一个项目,您可能有以下情况:

  1. 2 个订阅在 onNext 之前创建 - 按预期工作。
  2. 第二个订阅在 onNext 之后但在 onComplete 之前创建 - 第二个订阅仅获得 onComplete
  3. 在 onComplete 之后创建的第二个订阅 - 2 次执行没有缓存

关于android - 从远程 API 重用 RxJava 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29283011/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com