- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
在我的应用程序中,我有一项服务可以跟踪用户位置,然后使用 RxJava
将其发送到服务器。如果请求成功,我会收到插入的 ID,然后我可以从我的本地数据库中删除它们。
我的问题是,我在上一个任务结束之前快速调用该可观察对象,以便服务器接收重复点(两个请求的相同点)。我需要在单线程上执行 Observable
以避免在前一个任务结束之前让另一个任务查询数据库。我创建了一个 Looper
线程,但我仍然发送重复项,但我不知道为什么。服务器请求现在似乎要等到它结束才能执行下一个请求,但在下一个请求中,它仍然发送相同的点!啊啊
final StoreChangeEvent finalEvent = event;
Observable
.defer(() -> Observable.just(database.getAllPoints()))
.flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
.map(result -> deletePoint(result))
.doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
.doOnCompleted(() -> emitStoreChange(finalEvent))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper)))
.subscribe();
似乎 database.getAllPoints()
被调用得太快了……我应该添加一个 .blocking() 吗?
假设我有 5 个积分要发布到服务器 (A,B,C,D)
结果:A-B-C-D 在服务器数据库中出现了两次,因为两个请求是用相同的点发送的
最佳答案
您可以尝试使用 Subject
- 一个既充当观察者又充当可观察对象的事物。
订阅它一次并通知它来自其他地方的新事件(onNext()
)。订阅者将随后处理这些事件。
我使用了 SerializedSubject
,以防您从不同的线程调用 notifyNewEvent()
,否则您可以使用 BehaviourSubject
。
SerializedSubject<StoreChangeEvent, StoreChangeEvent> subject = new SerializedSubject(BehaviorSubject.create())
public void initialize() {
// Since you access your incoming event from doOnCompleted,
// need this extra flatMap function so that you can access your event
// outside rx java chain.
subject.flatMap(new Func1() {
@Override
public Observable call(StoreChangeEvent event) {
return Observable
.just(database.getAllPoints())
.flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
.map(result -> deletePoint(result))
.doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
.doOnCompleted(() -> emitStoreChange(finalEvent)));
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
}
public void notifyNewEvent(StoreChangeEvent event) {
subject.onNext(event);
}
如果您不访问 UI,为什么还要为主题和 observeOn、subscribeOn 而烦恼。创建一个具有一个线程的执行器(所有任务随后执行),并将任务提交给它,在那里利用 RxJava 的有用功能。
ExecutorService executorService = Executors.newSingleThreadExecutor();
public void notifyNewEvent(final StoreChangeEvent event) {
executorService.execute(new Runnable() {
public void run() {
Observable.just(database.getAllPoints())
// Blocking, so that the thread doesn't exit
// and blocks on subscribe() till completion.
.toBlocking()
.flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
.map(result -> deletePoint(result))
.doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
.doOnCompleted(() -> emitStoreChange(finalEvent)))
.subscribe();
}
});
}
关于android - 使 RxJava 异步任务线程安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39456532/
我正在进行 2 个 RX 调用,这些调用相互嵌套且相互依赖。服务器存在问题(由于各种原因目前无法解决),该问题在第二个嵌套调用中返回错误。 在这个问题得到解决之前,我需要确保如果第二次调用返回错误,则
这个问题在这里已经有了答案: How to resolve Duplicate files copied in APK META-INF/rxjava.properties (8 个答案) 关闭 5
我正在尝试将此 RxJava1 代码转换为 RxJava2 public static Observable listFolder(Path dir, String glob) { retur
这个问题在这里已经有了答案: RxJava 1 and RxJava 2 in the same project [duplicate] (1 个回答) How to resolve Duplica
有显示项目的RecyclerViewAdapter(该项目已经映射到数据库中)。 RecyclerViewAdapter 包含对 Presenter 的引用以加载项目。它还包含带有项目 ID 的 Ar
我想弄清楚如何在 Android 中使用 RxJava 将 Realm 对象保存在 Realm 中。到目前为止,结合所有这些的所有示例都是如何从 Realm 读取数据的。我想在 android 中使用
我在日志中收到此错误: Caused by java.lang.ClassCastException: java.net.UnknownHostException cannot be cast to
我有一个 API 服务类,其方法返回 Retrofit 提供的调用。 最近,Rx2Java 引入了 Single,所以我想将 Call 更改为 Single,但我不想更改逻辑。 例如 : 类接口(in
如何使用运算符让我始终获得以前和当前的值?如果可能的话,我想避免在管道外创建状态。 - time -> 1 2 3 4 | | | | Op
我正在努力实现以下目标。我加载了一个对象列表,我想获取稍后放入列表中的值。 首先,我使用 flatmap 将所有值收集到一个数组中(按山顺序),然后当一切完成后,我填充一个适配器。 我无法做的是每
是否可以选择使用 timeout 的变体不发射 Throwable ? 我要 complete事件发出。 最佳答案 您不需要使用 onErrorResumeNext 映射错误。您可以使用以下方法提供备
我们可以在 C# Rx 中异步执行一些代码,如下所示,使用 Observable.Start()。我想知道 RxJava 中的等价物是什么。 void Main() { AddTwoNum
问题:我有一个用户可以输入查询字符串的功能,我制作了 2 个可观察对象,一个用于查询我的本地数据库,另一个用于从 API 获取结果。这两个操作必须并行运行。我需要尽快显示来自数据库的结果,当 API
我正在尝试在 MVVM 中实现 ViewModel,将可观察对象作为“输入流”提供,将观察者作为“输出流”提供以供 View 绑定(bind)。 如果 getUser() 调用成功,下面的代码似乎可以
出于某种原因,我有时想使用 RxOperators 而不是普通的 java 方式来转换数据结构,因为它越来越干净。例如: Observable.from(listOfStrings) .filter(
我是 RxJava 新手,我需要以异步方式使用 Observable 功能。 我还需要使用超时:在我的示例中,我希望每个进程在 1 秒或更短的时间内结束。 这是我现在所做的: public stati
我正在尝试在网络请求期间在UI中显示进度条至少3秒钟。 此答案中描述的相同方法似乎不适用于Single。 RxJava Observable minimum execution time Single
我有一个可观察的(很热),它通过系统进程执行操作,并且我希望也运行一个间隔,直到该进程可观察达到 onComplete。 我看到区间运算符:http://reactivex.io/documentat
好吧,我是 RxJava2 的新手(嗯,我也不了解 RxJava),并且正在尝试使用 RxJava2 和 MVP 结构开发 Android 应用程序。 在该应用程序中,我正在对使用监听器的库进行异步调
如何将单个流拆分为单独的单个流,这样就可以执行以下操作而无需两次计算getUserId()? // getUserId() returns Single getUserId().flatMap { g
我是一名优秀的程序员,十分优秀!