gpt4 book ai didi

java - 在异步调用中创建的 RxJava Observable 替代方案

转载 作者:搜寻专家 更新时间:2023-11-01 02:04:49 24 4
gpt4 key购买 nike

我听了这个演讲 https://www.youtube.com/watch?v=QdmkXL7XikQ&feature=youtu.be&t=274

听说我应该避免使用 create 方法创建 Observable,因为它不会自动处理取消订阅和背压,但我找不到在下面的代码中使用的替代方法。

compositeSubscription.add(
Observable.create(new Observable.OnSubscribe<DTOCompaniesCallback>() {
@Override
public void call(final Subscriber<? super DTOCompaniesCallback> subscriber) {

modelTrainStrike.getCompaniesFromServer(new CompaniesCallback() {
@Override
public void onResult(DTOCompaniesCallback dtoCompaniesCallback) {
try {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(dtoCompaniesCallback);
subscriber.onCompleted();
}
} catch (Exception e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
}
});

}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<DTOCompaniesCallback>() {
@Override
public void call(DTOCompaniesCallback dtoCompaniesCallback) {
Log.i("TAG", "onResult: " + dtoCompaniesCallback.getCompaniesList().size());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throw new OnErrorNotImplementedException("Source!", throwable);
}
})
);

然后我在 OnDestroy 方法中调用清除 CompositeSubscription

@Override
public void onDestroy() {
if (compositeSubscription != null) {
compositeSubscription.clear();
}
}

您是否看到我可以在此处使用的 create 方法的任何替代方法?您是否看到任何潜在的危险或这种方法安全吗?谢谢

最佳答案

你可以使用 defer + AsyncSubject:

Observable.defer(() -> {
AsyncSubject<DTOCompaniesCallback> async = AsyncSubject.create();
modelTrainStrike.getCompaniesFromServer(v -> {
async.onNext(v);
async.onComplete();
});
return async;
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
...

如果 getCompaniesFromServer 支持取消,您可以:

Observable.defer(() -> {
AsyncSubject<DTOCompaniesCallback> async = AsyncSubject.create();
Closeable c = modelTrainStrike.getCompaniesFromServer(v -> {
async.onNext(v);
async.onComplete();
});
return async.doOnUnsubscribe(() -> {
try { c.close(); } catch (IOException ex) { }
});
})

关于java - 在异步调用中创建的 RxJava Observable 替代方案,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36765336/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com