gpt4 book ai didi

android - Zip 似乎不等待 Observable 发出数据

转载 作者:塔克拉玛干 更新时间:2023-11-03 00:20:44 25 4
gpt4 key购买 nike

我正在尝试使用 volley 运行 2 个并行请求以更新数据库(使用 DBFlow)。只有在发出两个请求并保存它们的数据 (FK) 后,才能填充数据库中的一张表。

鉴于下面的示例,我希望两个部门/员工获取/插入并行运行,然后在插入完成后保存契约(Contract)。

/**
* Update the sectors, employees and their contracts
* @return An Observable to watch for the process to complete
*
* sectorsFetch______sectorsInsert________contractsInsert
* employeesFetch____employeesInsert___/
*/
public static Observable<Void> updateEverything() {
try {
Log.d(TAG, "Starting update...");
Observable<JSONArray> employeesFetch = Observable.from(ForumAPI.getInstance().getEmployees());
Observable<List<Contract>> employeesInsert = employeesFetch.flatMap(new Func1<JSONArray, Observable<List<Contract>>>() {
@Override
public Observable<List<Contract>> call(JSONArray employees) {
Log.d(TAG, "Employee list fetched");
return saveEmployees(employees);
}
});

Observable<JSONArray> sectorsFetch = Observable.from(ForumAPI.getInstance().getSectors());
Observable<Void> sectorsInsert = sectorsFetch.flatMap(new Func1<JSONArray, Observable<Void>>() {
@Override
public Observable<Void> call(JSONArray sectors) {
Log.d(TAG, "Sector list fetched");
return saveSectors(sectors);
}
});

return Observable.zip(sectorsInsert, employeesInsert, new Func2<Void, List<Contract>, Void>() {
@Override
public Void call(Void aVoid, List<Contract> contracts) {
Log.d(TAG, "Sectors and employees saved. Saving contracts");
return saveContracts(contracts);
}
});

} catch (InterruptedException | ExecutionException e) {
Log.e(TAG, e.getMessage());
return Observable.error(e);
}
}

注意:ForumAPI getEmployees/Sectors 返回 Future 两者。

下面是保存方法。

/**
* Parse and save an array of sectors
* @param jsonSectors The array of sector to save
* @return An Observable to watch for the process to complete.
*/
private static Observable<Void> saveSectors(JSONArray jsonSectors) {
Log.d(TAG, "Transforming JSON sectors to object");
List<Sector> sectList = new ArrayList<>();
try {
for (int i = 0; i < jsonSectors.length(); i++) {
JSONObject jsonSect = jsonSectors.getJSONObject(i);
Sector sect = Sector.build(jsonSect);
sectList.add(sect);
}
Log.d(TAG, sectList.size() + " sectors fetched. Saving...");
ForumDB.getDB().executeTransaction(
FastStoreModelTransaction.insertBuilder(
FlowManager.getModelAdapter(Sector.class)
).addAll(sectList).build());

Log.d(TAG, "Sector list saved");
} catch (JSONException e) {
Log.e(TAG, "Unable to parse sector list. " + e.getMessage());
return Observable.error(e);
}
return Observable.empty();
}
/**
* Parse and save an array of sectors
* @param jsonSectors The array of sector to save
* @return An Observable to watch for the process to complete.
*/
private static Observable<Void> saveSectors(JSONArray jsonSectors) {
Log.d(TAG, "Transforming JSON sectors to object");
List<Sector> sectList = new ArrayList<>();
try {
for (int i = 0; i < jsonSectors.length(); i++) {
JSONObject jsonSect = jsonSectors.getJSONObject(i);
Sector sect = Sector.build(jsonSect);
sectList.add(sect);
}
Log.d(TAG, sectList.size() + " sectors fetched. Saving...");
ForumDB.getDB().executeTransaction(
FastStoreModelTransaction.insertBuilder(
FlowManager.getModelAdapter(Sector.class)
).addAll(sectList).build());

Log.d(TAG, "Sector list saved");
} catch (JSONException e) {
Log.e(TAG, "Unable to parse sector list. " + e.getMessage());
return Observable.error(e);
}
return Observable.empty();
}

/**
* Parse and save an array of employees
* @param jsonEmployees The array of employee to save
* @return An Observable to watch for the process to complete.
*/
private static Observable<List<Contract>> saveEmployees(JSONArray jsonEmployees) {
Log.d(TAG, "Transforming JSON employees to object");
List<Person> empList = new ArrayList<>();
List<Contract> contractList = new ArrayList<>();
try {
for (int i = 0; i < jsonEmployees.length(); i++) {
JSONObject jsonEmp = jsonEmployees.getJSONObject(i);
Person emp = Person.build(jsonEmp);
empList.add(emp);
JSONArray jsonContracts = jsonEmp.getJSONArray("sectors");
for (int j = 0; j < jsonContracts.length(); j++) {
Contract contract = new Contract();
contract.setSectorId(jsonContracts.getJSONObject(j).getInt("id"));
contract.setPersonForumId(emp.getForumId());
contractList.add(contract);
}
}
Log.d(TAG, empList.size() + " employees fetched. Saving...");
ForumDB.getDB().executeTransaction(
FastStoreModelTransaction.insertBuilder(
FlowManager.getModelAdapter(Person.class)
).addAll(empList).build());
Log.d(TAG, "Employee list saved");
} catch (JSONException e) {
Log.e(TAG, "Unable to parse employee list. " + e.getMessage());
return Observable.error(e);
}
return Observable.just(contractList);
}

/**
* Save a list of contract
* @param contracts The list of contract to save
* @return An Observable to watch for the process to complete.
*/
private static Void saveContracts(List contracts) {
ForumDB.getDB().executeTransaction(
FastStoreModelTransaction.insertBuilder(
FlowManager.getModelAdapter(Contract.class)
).addAll(contracts).build());
Log.d(TAG, "Contract list saved");

return null;
}

问题是,当从 Android Activity 订阅全局可观察对象时,我的观察者 onCompleted 在 sectorFetch 发出数据后立即被调用(sectorInsert 和我的 zip 都没有被调用)。

下面是日志

D/com.xx.observable.DataUpdater: Starting update...
D/com.xx.helper.ForumAPI: Requesting employee list
D/com.xx.helper.ForumAPI: Request added to queue...
D/com.xx.helper.ForumAPI: Requesting sector list
D/com.xx.helper.ForumAPI: Request added to queue
D/com.xx.observable.DataUpdater: Sector list fetched
D/com.xx.observable.DataUpdater: Transforming JSON sectors to object
D/com.xx.observable.DataUpdater: 8 sectors fetched. Saving...
D/com.xx.observable.DataUpdater: Sector list saved
D/com.xx.activity.Startup: onCompleted reached

我找不到问题所在。我的一个 observable 是否发出了一些东西,所以我的 zip 在它应该之前被调用了?

最佳答案

zip 1.1.6 中的文档已更新以描述这种情况:

The operator subscribes to its sources in order they are specified and completes eagerly if one of the sources is shorter than the rest while unsubscribing the other sources. Therefore, it is possible those other sources will never be able to run to completion (and thus not calling doOnCompleted(). This can also happen if the sources are exactly the same length; if source A completes and B has been consumed and is about to complete, the operator detects A won't be sending further values and it will unsubscribe B immediately. For example: zip(Arrays.asList(range(1, 5).doOnCompleted(action1), range(6, 5).doOnCompleted(action2)), (a) -> a) action1 will be called but action2 won't.

换句话说,不要用 empty() 压缩.您可以使用 Observable.<Void>just(null) 压缩并忽略那条栏杆。

关于android - Zip 似乎不等待 Observable 发出数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38124652/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com