- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在使用 spring 框架 StringRedisTemplate 来更新发生在多个线程中的条目。
public void processSubmission(final String key, final Map<String, String> submissionDTO) {
final String hashKey = String.valueOf(Hashing.MURMUR_HASH.hash(key));
this.stringRedisTemplate.expire(key, 60, TimeUnit.MINUTES);
final HashOperations<String, String, String> ops = this.stringRedisTemplate.opsForHash();
Map<String, String> data = findByKey(key);
String json;
if (data != null) {
data.putAll(submissionDTO);
json = convertSubmission(data);
} else {
json = convertSubmission(submissionDTO);
}
ops.put(key, hashKey, json);
}
在这个 json 条目中,如下所示,
key (assignmentId) -> value (submissionId, status)
如代码所示,在更新缓存条目之前,我获取当前条目并添加新条目并将它们全部放入。但是由于这个操作可以在多个线程中进行,所以可能会出现竞争条件导致数据丢失的情况。我可以同步上面的方法,但是这将成为 RxJava 实现的并行处理能力的瓶颈,其中 processSubmission 方法是通过 RxJava 在两个异步线程上调用的。
class ProcessSubmission{
@Override
public Observable<Boolean> processSubmissionSet1(List<Submission> submissionList, HttpHeaders requestHeaders) {
return Observable.create(observer -> {
for (final Submission submission : submissionList) {
//Cache entry insert method invoke via this call
final Boolean status = processSubmissionExecutor.processSubmission(submission, requestHeaders);
observer.onNext(status);
}
observer.onCompleted();
});
}
@Override
public Observable<Boolean> processSubmissionSet2(List<Submission> submissionList, HttpHeaders requestHeaders) {
return Observable.create(observer -> {
for (final Submission submission : submissionList) {
//Cache entry insert method invoke via this call
final Boolean status = processSubmissionExecutor.processSubmission(submission, requestHeaders);
observer.onNext(status);
}
observer.onCompleted();
});
}
}
上面将从下面的服务 API 调用。
class MyService{
public void handleSubmissions(){
final Observable<Boolean> statusObser1 = processSubmission.processSubmissionSet1(subListDtos.get(0), requestHeaders)
.subscribeOn(Schedulers.newThread());
final Observable<Boolean> statusObser2 = processSubmission.processSubmissionSet2(subListDtos.get(1), requestHeaders)
.subscribeOn(Schedulers.newThread());
statusObser1.subscribe();
statusObser2.subscribe();
}
}
因此 handleSubmissions 正在调用每个分配 ID 的多个线程。但随后每个主线程都会创建并调用两个响应式(Reactive) Java 线程,并处理与每个分配关联的提交列表。
在保持 RxJava 实现的性能的同时,我可以防止 redis 进入竞争条件的最佳方法是什么?有没有一种方法可以更有效地执行此 redis 操作?
最佳答案
看起来您只是在最后使用 ops
变量执行 put
操作,您可以隔离需要同步的那个点.
在我所做的简短研究中,我找不到 HashOperations
是否已经是线程安全的)。
但是您可以如何隔离您关心的部分的一个例子是做类似的事情:
public void processSubmission(final String key, final Map<String, String> submissionDTO) {
final String hashKey = String.valueOf(Hashing.MURMUR_HASH.hash(key));
this.stringRedisTemplate.expire(key, 60, TimeUnit.MINUTES);
Map<String, String> data = findByKey(key);
String json;
if (data != null) {
data.putAll(submissionDTO);
json = convertSubmission(data);
} else {
json = convertSubmission(submissionDTO);
}
putThreadSafeValue(key, hashKey, json);
}
并且有一个只为 put
操作同步的方法:
private synchronized void putThreadSafeValue(key, hashKey, json) {
final HashOperations<String, String, String> ops = this.stringRedisTemplate.opsForHash();
ops.put(key, hashKey, json);
}
有很多方法可以做到这一点,但看起来您可以将线程争用限制在 put
操作上。
关于java - 使用 RxJava 异步线程实现防止竞争条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47117870/
我正在进行 2 个 RX 调用,这些调用相互嵌套且相互依赖。服务器存在问题(由于各种原因目前无法解决),该问题在第二个嵌套调用中返回错误。 在这个问题得到解决之前,我需要确保如果第二次调用返回错误,则
这个问题在这里已经有了答案: How to resolve Duplicate files copied in APK META-INF/rxjava.properties (8 个答案) 关闭 5
我正在尝试将此 RxJava1 代码转换为 RxJava2 public static Observable listFolder(Path dir, String glob) { retur
这个问题在这里已经有了答案: RxJava 1 and RxJava 2 in the same project [duplicate] (1 个回答) How to resolve Duplica
有显示项目的RecyclerViewAdapter(该项目已经映射到数据库中)。 RecyclerViewAdapter 包含对 Presenter 的引用以加载项目。它还包含带有项目 ID 的 Ar
我想弄清楚如何在 Android 中使用 RxJava 将 Realm 对象保存在 Realm 中。到目前为止,结合所有这些的所有示例都是如何从 Realm 读取数据的。我想在 android 中使用
我在日志中收到此错误: Caused by java.lang.ClassCastException: java.net.UnknownHostException cannot be cast to
我有一个 API 服务类,其方法返回 Retrofit 提供的调用。 最近,Rx2Java 引入了 Single,所以我想将 Call 更改为 Single,但我不想更改逻辑。 例如 : 类接口(in
如何使用运算符让我始终获得以前和当前的值?如果可能的话,我想避免在管道外创建状态。 - time -> 1 2 3 4 | | | | Op
我正在努力实现以下目标。我加载了一个对象列表,我想获取稍后放入列表中的值。 首先,我使用 flatmap 将所有值收集到一个数组中(按山顺序),然后当一切完成后,我填充一个适配器。 我无法做的是每
是否可以选择使用 timeout 的变体不发射 Throwable ? 我要 complete事件发出。 最佳答案 您不需要使用 onErrorResumeNext 映射错误。您可以使用以下方法提供备
我们可以在 C# Rx 中异步执行一些代码,如下所示,使用 Observable.Start()。我想知道 RxJava 中的等价物是什么。 void Main() { AddTwoNum
问题:我有一个用户可以输入查询字符串的功能,我制作了 2 个可观察对象,一个用于查询我的本地数据库,另一个用于从 API 获取结果。这两个操作必须并行运行。我需要尽快显示来自数据库的结果,当 API
我正在尝试在 MVVM 中实现 ViewModel,将可观察对象作为“输入流”提供,将观察者作为“输出流”提供以供 View 绑定(bind)。 如果 getUser() 调用成功,下面的代码似乎可以
出于某种原因,我有时想使用 RxOperators 而不是普通的 java 方式来转换数据结构,因为它越来越干净。例如: Observable.from(listOfStrings) .filter(
我是 RxJava 新手,我需要以异步方式使用 Observable 功能。 我还需要使用超时:在我的示例中,我希望每个进程在 1 秒或更短的时间内结束。 这是我现在所做的: public stati
我正在尝试在网络请求期间在UI中显示进度条至少3秒钟。 此答案中描述的相同方法似乎不适用于Single。 RxJava Observable minimum execution time Single
我有一个可观察的(很热),它通过系统进程执行操作,并且我希望也运行一个间隔,直到该进程可观察达到 onComplete。 我看到区间运算符:http://reactivex.io/documentat
好吧,我是 RxJava2 的新手(嗯,我也不了解 RxJava),并且正在尝试使用 RxJava2 和 MVP 结构开发 Android 应用程序。 在该应用程序中,我正在对使用监听器的库进行异步调
如何将单个流拆分为单独的单个流,这样就可以执行以下操作而无需两次计算getUserId()? // getUserId() returns Single getUserId().flatMap { g
我是一名优秀的程序员,十分优秀!