gpt4 book ai didi

android - RxJava2 可流动 : send objects to server on by one and detect end

转载 作者:行者123 更新时间:2023-11-30 04:53:58 24 4
gpt4 key购买 nike

我需要将对象列表发送到我的远程服务器。由于它们可能很多而且很大,所以我使用一个可流动的对象从一个数组列表中使用 request(1) 将它们一个一个地发送。

对于每个对象,都会对服务器进行改造调用,作为返回,我会获得远程 ID,并使用远程 ID 更新本地对象。

我需要检测此任务的结束:即发送的最后一个对象的最后响应,以防止对同一对象的多个并发调用。

目前一切正常,但我在从远程服务器收到答案之前收到“已完成”消息,因此在对象更新之前。

我该怎么做?

Flowable<Integer> observable = Flowable.range(0, objList.size());

observable.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onStart() {
Log.d(TAG, "on start");
request(1);
}

@Override
public void onNext(Integer t) {
Log.d(TAG, "on next : " + t);
MyObj = objList.get(t);

RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {

Log.d(TAG, "recu p");

if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
request(1);

return Observable.empty();
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.error(th);
}
} else {
request(1);
return Observable.empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(r -> {
request(1);
Observable.empty();
})
.onExceptionResumeNext(error -> Observable.empty()) // go to next on error
.subscribeOn(Schedulers.io()).onErrorReturn(error -> {
Log.d("ERROR", error.getMessage());
return 0;
})

.onErrorResumeNext(Observable.empty()).subscribe();
}


@Override
public void onError(Throwable t) {
Log.e("XXX ERROR ", "" + t);
request(1);
patientSynchroInProgress = Boolean.FALSE;
}

@Override
public void onComplete() {
Log.e("XXX COMPLETE", "complete");
}
});

最佳答案

您应该将改造调用移到 map(...) 运算符中:

Flowable<Integer> observable = Flowable.range(0, objList.size());

observable
.map(t -> {
MyObj = objList.get(t);

return RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {

Log.d(TAG, "recu p");

if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);

return Observable.empty();
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.error(th);
}
} else {
return Observable.empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(r -> {
Observable.empty();
})
.onExceptionResumeNext(error -> Observable.empty()) // go to next on error
.subscribeOn(Schedulers.io()).onErrorReturn(error -> {
Log.d("ERROR", error.getMessage());
return 0;
})

.onErrorResumeNext(Observable.empty())
})
.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onStart() {
Log.d(TAG, "on start");
}

@Override
public void onNext(Integer t) {
Log.d(TAG, "on next : " + t);
}


@Override
public void onError(Throwable t) {
Log.e("XXX ERROR ", "" + t);
patientSynchroInProgress = Boolean.FALSE;
}

@Override
public void onComplete() {
Log.e("XXX COMPLETE", "complete");
}
});

您正在 onNext(...) 中执行改造调用,因此您的网络响应可能不是连续的。通过使用 map(...) 运算符转换您的可观察对象,每个发射都将成为一个单独的网络调用。这允许您的 onNext(...) 函数打印改造调用的顺序结果,并允许您的 onComplete() 在所有后续调用完成时执行。

关于android - RxJava2 可流动 : send objects to server on by one and detect end,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59537035/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com