gpt4 book ai didi

java - 使用 RxJava 异步线程实现防止竞争条件

转载 作者:可可西里 更新时间:2023-11-01 10:54:25 26 4
gpt4 key购买 nike

我正在使用 spring 框架 StringRedisTemplate 来更新发生在多个线程中的条目。

public void processSubmission(final String key, final Map<String, String> submissionDTO) {
final String hashKey = String.valueOf(Hashing.MURMUR_HASH.hash(key));
this.stringRedisTemplate.expire(key, 60, TimeUnit.MINUTES);
final HashOperations<String, String, String> ops = this.stringRedisTemplate.opsForHash();
Map<String, String> data = findByKey(key);
String json;
if (data != null) {
data.putAll(submissionDTO);
json = convertSubmission(data);
} else {
json = convertSubmission(submissionDTO);
}
ops.put(key, hashKey, json);
}

在这个 json 条目中,如下所示,

key (assignmentId) -> value (submissionId, status) 

如代码所示,在更新缓存条目之前,我获取当前条目并添加新条目并将它们全部放入。但是由于这个操作可以在多个线程中进行,所以可能会出现竞争条件导致数据丢失的情况。我可以同步上面的方法,但是这将成为 RxJava 实现的并行处理能力的瓶颈,其中 processSubmission 方法是通过 RxJava 在两个异步线程上调用的。

class ProcessSubmission{

@Override
public Observable<Boolean> processSubmissionSet1(List<Submission> submissionList, HttpHeaders requestHeaders) {
return Observable.create(observer -> {
for (final Submission submission : submissionList) {
//Cache entry insert method invoke via this call
final Boolean status = processSubmissionExecutor.processSubmission(submission, requestHeaders);
observer.onNext(status);
}
observer.onCompleted();
});
}

@Override
public Observable<Boolean> processSubmissionSet2(List<Submission> submissionList, HttpHeaders requestHeaders) {
return Observable.create(observer -> {
for (final Submission submission : submissionList) {
//Cache entry insert method invoke via this call
final Boolean status = processSubmissionExecutor.processSubmission(submission, requestHeaders);
observer.onNext(status);
}
observer.onCompleted();
});
}

}

上面将从下面的服务 API 调用。

class MyService{    
public void handleSubmissions(){
final Observable<Boolean> statusObser1 = processSubmission.processSubmissionSet1(subListDtos.get(0), requestHeaders)
.subscribeOn(Schedulers.newThread());
final Observable<Boolean> statusObser2 = processSubmission.processSubmissionSet2(subListDtos.get(1), requestHeaders)
.subscribeOn(Schedulers.newThread());
statusObser1.subscribe();
statusObser2.subscribe();
}
}

因此 handleSubmissions 正在调用每个分配 ID 的多个线程。但随后每个主线程都会创建并调用两个响应式(Reactive) Java 线程,并处理与每个分配关联的提交列表。

在保持 RxJava 实现的性能的同时,我可以防止 redis 进入竞争条件的最佳方法是什么?有没有一种方法可以更有效地执行此 redis 操作?

最佳答案

看起来您只是在最后使用 ops 变量执行 put 操作,您可以隔离需要同步的那个点.

在我所做的简短研究中,我找不到 HashOperations 是否已经是线程安全的)。

但是您可以如何隔离您关心的部分的一个例子是做类似的事情:

public void processSubmission(final String key, final Map<String, String> submissionDTO) {
final String hashKey = String.valueOf(Hashing.MURMUR_HASH.hash(key));
this.stringRedisTemplate.expire(key, 60, TimeUnit.MINUTES);
Map<String, String> data = findByKey(key);
String json;
if (data != null) {
data.putAll(submissionDTO);
json = convertSubmission(data);
} else {
json = convertSubmission(submissionDTO);
}
putThreadSafeValue(key, hashKey, json);
}

并且有一个只为 put 操作同步的方法:

private synchronized void putThreadSafeValue(key, hashKey, json) {
final HashOperations<String, String, String> ops = this.stringRedisTemplate.opsForHash();
ops.put(key, hashKey, json);
}

有很多方法可以做到这一点,但看起来您可以将线程争用限制在 put 操作上。

关于java - 使用 RxJava 异步线程实现防止竞争条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47117870/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com