gpt4 book ai didi

android - 存储从 web 服务获取的数据后,在 RxJava 中返回订阅者

转载 作者:行者123 更新时间:2023-11-29 23:49:34 24 4
gpt4 key购买 nike

我正在尝试调用网络服务来获取数据并使用以下代码将其存储到数据库中。我创建了一个单独的类来执行以下操作。

现在,问题是当我成功获取数据并将数据存储到数据库中时,我想通知我的 Activity 。如果发生某些错误,那么我想在 UI 本身上显示它。

我能够以某种方式编写代码来使用分页获取数据,但不确定如何通知 UI 我可以订阅的位置捕获与进度和错误相关的更新(如果有)。

public Flowable<Response> getFitnessData() {

Request request = new Request();
request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");

Flowable<Response> fitnessFlowable = new WebRequest()
.getRemoteClient()
.create(FitnessApi.class)
.getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


fitnessFlowable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.takeUntil(response->response.getSummary().getNext()!=null)

.subscribe(new Subscriber<Response>() {
@Override
public void onSubscribe(Subscription s) {

s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Response response) {

Log.e(TAG, "onNext" );

if(response !=null){

if(response.getFitness()!=null && response.getFitness().size()!=0){

Realm realm = Realm.getDefaultInstance();
realm.executeTransactionAsync(new Realm.Transaction() {
@Override
public void execute(Realm realm) {

realm.copyToRealmOrUpdate(response.getFitness());

}
}, new Realm.Transaction.OnSuccess() {
@Override
public void onSuccess() {

Log.i(TAG, "onSuccess , fitness data saved");

}
}, new Realm.Transaction.OnError() {
@Override
public void onError(Throwable error) {
Log.i(TAG, "onError , fitness data failed to save"+error.getMessage() );
}
});
}else{

Log.i(TAG, "onError , no fitness data available" );


}

}else{
Log.i(TAG, "onError , response is null" );

}
}

@Override
public void onError(Throwable t) {


Log.e(TAG, "onError" +t.getMessage());
}

@Override
public void onComplete() {

Log.e(TAG, "onComplete");
}
});;

return null;

}

已更新

创建 RxBus 以在 UI 上传播事件和错误

public class RxBus {

private static final RxBus INSTANCE = new RxBus();

private RxBus(){}
private PublishSubject<Object> bus = PublishSubject.create();

public static RxBus getInstance() {
return INSTANCE;
}


public void send(Object o) {
bus.onNext(o);
}

public void error(Throwable e){bus.onError(e);}

public Observable<Object> toObservable() {
return bus;
}
}

在 Activity 中

 FitnessRepo fitnessRepo = new FitnessRepo();
fitnessRepo.getFitnessData();
RxBus.getInstance().toObservable().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Object o) {

if(o instanceof RealmList ){

RealmList<Fitness> realmList = (RealmList<Fitness>) o;
Log.e(TAG,"Fitness data size "+realmList.size());

}
}

@Override
public void onError(Throwable e) {

Log.e(TAG,e.getMessage()+ "");

if (e instanceof HttpException) {
ResponseBody body = ((HttpException) e).response().errorBody();


try {

Response response= LoganSquare.parse(body.byteStream(),Response.class);

if(response.getErrors() !=null)
if(response.getErrors().size() > 0)
Log.e(TAG, "Error "+response.getErrors().get(0).getErrors());
} catch (IOException t) {
t.printStackTrace();
}

}
}

@Override
public void onComplete() {

}
});

在网络服务调用中

public void getFitnessData() {


Request request = new Request();
request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");
request.setEnd_date("2018-07-01T00:00:00");
Flowable<Response> fitnessFlowable = new WebRequest()
.getRemoteClient()
.create(FitnessApi.class)
.getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


fitnessFlowable.subscribeOn(Schedulers.io())
.takeUntil(response->response.getSummary().getNext()!=null)
.doOnNext((response) -> {
if(response ==null || response.getFitness() == null || response.getFitness().isEmpty()) {


Log.e(TAG, " Error ");
return;
}

RxBus.getInstance().send(response.getFitness());

try(Realm r = Realm.getDefaultInstance()) {
r.executeTransaction((realm) -> {
realm.copyToRealmOrUpdate(response.getFitness());
});
}
}).subscribe(item ->{


},
error ->{

RxBus.getInstance().error(error);


});
}

最佳答案

有个好消息要告诉你!您可以删除几乎所有这些代码,从而使它总体上变得更好!

public void fetchFitnessData() {

Request request = new Request();
request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");

Flowable<Response> fitnessFlowable = new WebRequest()
.getRemoteClient()
.create(FitnessApi.class)
.getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


fitnessFlowable.subscribeOn(Schedulers.io())
.takeUntil(response->response.getSummary().getNext()!=null)
.doOnNext((response) -> {
if(response ==null || response.getFitness() == null || response.getFitness().isEmpty()) return;

try(Realm r = Realm.getDefaultInstance()) {
r.executeTransaction((realm) -> {
realm.insertOrUpdate(response.getFitness());
});
}
}
}).subscribe();
}

此方法现在在后台线程上并返回 void,因此从该方法发出内容的方法是使用 PublishSubject(一个表示成功,一个表示失败)或一个 EventBus

private PublishSubject<Object> fitnessResults;
public Observable<Object> observeFitnessResults() {
return fitnessResults;
}

public static class Success {
public Success(List<Fitness> data) {
this.data = data;
}

public List<Fitness> data;
}

public static class Failure {
public Failure(Exception exception) {
this.exception = exception;
}

public Exception exception;
}

public void fetchFitnessData() {
...
fitnessResults.onNext(new Success(data));
} catch(Exception e) {
fitnessResults.onNext(new Failure(e));

然后

errors = observeFitnessResults().ofType(Error.class);
success = observeFitnessResults().ofType(Success.class);

关于android - 存储从 web 服务获取的数据后,在 RxJava 中返回订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51048199/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com