gpt4 book ai didi

android - 使 RxJava 异步任务线程安全

转载 作者:太空狗 更新时间:2023-10-29 13:12:02 26 4
gpt4 key购买 nike

在我的应用程序中,我有一项服务可以跟踪用户位置,然后使用 RxJava 将其发送到服务器。如果请求成功,我会收到插入的 ID,然后我可以从我的本地数据库中删除它们。

  1. 查询数据库以获取要发送的积分
  2. 如果不为空,我会发布所有从数据库中收集到的点
  3. 如果请求成功,我会从数据库中删除发布的积分

我的问题是,我在上一个任务结束之前快速调用该可观察对象,以便服务器接收重复点(两个请求的相同点)。我需要在单线程上执行 Observable 以避免在前一个任务结束之前让另一个任务查询数据库。我创建了一个 Looper 线程,但我仍然发送重复项,但我不知道为什么。服务器请求现在似乎要等到它结束才能执行下一个请求,但在下一个请求中,它仍然发送相同的点!啊啊

    final StoreChangeEvent finalEvent = event;
Observable
.defer(() -> Observable.just(database.getAllPoints()))
.flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
.map(result -> deletePoint(result))
.doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
.doOnCompleted(() -> emitStoreChange(finalEvent))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper)))
.subscribe();

似乎 database.getAllPoints() 被调用得太快了……我应该添加一个 .blocking() 吗?

假设我有 5 个积分要发布到服务器 (A,B,C,D)

  1. 我查询数据库并将 A-B-C-D 发送到服务器
  2. 我从设备接收到另一个点(点 E)
  3. 我查询数据库并发送 (A-B-C-D-E)
  4. 我从服务器收到成功,然后我从本地数据库中删除 A-B-C-D
  5. 我从第二个请求中收到成功,然后我从本地数据库中删除了 A-B-C-D-E

结果:A-B-C-D 在服务器数据库中出现了两次,因为两个请求是用相同的点发送的

最佳答案

选项 1 - 主题

您可以尝试使用 Subject - 一个既充当观察者又充当可观察对象的事物。

订阅它一次并通知它来自其他地方的新事件(onNext())。订阅者将随后处理这些事件。

我使用了 SerializedSubject,以防您从不同的线程调用 notifyNewEvent(),否则您可以使用 BehaviourSubject

SerializedSubject<StoreChangeEvent, StoreChangeEvent> subject = new SerializedSubject(BehaviorSubject.create())

public void initialize() {
// Since you access your incoming event from doOnCompleted,
// need this extra flatMap function so that you can access your event
// outside rx java chain.
subject.flatMap(new Func1() {
@Override
public Observable call(StoreChangeEvent event) {
return Observable
.just(database.getAllPoints())
.flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
.map(result -> deletePoint(result))
.doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
.doOnCompleted(() -> emitStoreChange(finalEvent)));
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
}

public void notifyNewEvent(StoreChangeEvent event) {
subject.onNext(event);
}

选项 2 - 执行者

如果您不访问 UI,为什么还要为主题和 observeOn、subscribeOn 而烦恼。创建一个具有一个线程的执行器(所有任务随后执行),并将任务提交给它,在那里利用 RxJava 的有用功能。

ExecutorService executorService = Executors.newSingleThreadExecutor();

public void notifyNewEvent(final StoreChangeEvent event) {
executorService.execute(new Runnable() {
public void run() {
Observable.just(database.getAllPoints())
// Blocking, so that the thread doesn't exit
// and blocks on subscribe() till completion.
.toBlocking()
.flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
.map(result -> deletePoint(result))
.doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
.doOnCompleted(() -> emitStoreChange(finalEvent)))
.subscribe();
}
});
}

关于android - 使 RxJava 异步任务线程安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39456532/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com