gpt4 book ai didi

java - 使用 Java 8 流和 CompletableFuture 的并行数据库调用

转载 作者:行者123 更新时间:2023-11-30 02:53:24 25 4
gpt4 key购买 nike

我想使用 Java 8 流复制和并行化以下行为:

for (animal : animalList) {
// find all other animals with the same breed
Collection<Animal> queryResult = queryDatabase(animal.getBreed());

if (animal.getSpecie() == cat) {
catList.addAll(queryResult);
} else {
dogList.addAll(queryResult);
}
}

这就是我目前所拥有的

final Executor queryExecutor =
Executors.newFixedThreadPool(Math.min(animalList.size(), 10),
new ThreadFactory(){
public Thread newThread(Runnable r){
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});

List<CompletableFuture<Collection<Animal>>> listFutureResult = animalList.stream()
.map(animal -> CompletableFuture.supplyAsync(
() -> queryDatabase(animal.getBreed()), queryExecutor))
.collect(Collectors.toList());

List<Animal> = listFutureResult.stream()
.map(CompletableFuture::join)
.flatMap(subList -> subList.stream())
.collect(Collectors.toList());

1 - 我不知道如何分割流,以便我可以获得 2 个不同的动物列表,一个用于猫,一个用于狗。

2 - 这个解决方案看起来合理吗?

最佳答案

首先,考虑仅使用

List<Animal> result = animalList.parallelStream()
.flatMap(animal -> queryDatabase(animal.getBreed()).stream())
.collect(Collectors.toList());

即使它不能为您提供所需的最多 10 个并发。简单性可能会弥补这一点。至于其他部分,很简单

Map<Boolean,List<Animal>> result = animalList.parallelStream()
.flatMap(animal -> queryDatabase(animal.getBreed()).stream())
.collect(Collectors.partitioningBy(animal -> animal.getSpecie() == cat));
List<Animal> catList = result.get(true), dogList = result.get(false);

如果您拥有的物种不仅仅是猫和狗,您可以使用Collectors.groupingBy(Animal::getSpecie) 获取从物种到动物列表的 map 。

<小时/>

如果您坚持使用自己的线程池,则可以改进一些事情:

Executor queryExecutor = Executors.newFixedThreadPool(Math.min(animalList.size(), 10),
r -> {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
});
List<Animal> result = animalList.stream()
.map(animal -> CompletableFuture.completedFuture(animal.getBreed())
.thenApplyAsync(breed -> queryDatabase(breed), queryExecutor))
.collect(Collectors.toList()).stream()
.flatMap(cf -> cf.join().stream())
.collect(Collectors.toList());

您的 supplyAsync 变体需要捕获实际的 Animal 实例,为每个动物创建一个新的 Supplier。相反,传递给 thenApplyAsync 的函数是不变的,对每个参数值执行相同的操作。上面的代码假设 getBreed 是一个廉价的操作,否则,将 Animal 实例传递给 completedFuture 并执行 getBreed() 改为异步函数。

.map(CompletableFuture::join) 可以替换为 flatMap 函数中的简单链式 .join()。否则,如果您更喜欢方法引用,则应该一致地使用它们,即 .map(CompletableFuture::join).flatMap(Collection::stream)

当然,此变体还允许使用 partitioningBy 而不是 toList

<小时/>

最后一点,如果您在使用后对执行程序服务调用shutdown,则无需将线程标记为守护进程:

ExecutorService queryExecutor=Executors.newFixedThreadPool(Math.min(animalList.size(),10));
Map<Boolean,List<Animal>> result = animalList.stream()
.map(animal -> CompletableFuture.completedFuture(animal.getBreed())
.thenApplyAsync(breed -> queryDatabase(breed), queryExecutor))
.collect(Collectors.toList()).stream()
.flatMap(cf -> cf.join().stream())
.collect(Collectors.partitioningBy(animal -> animal.getSpecie() == cat));
List<Animal> catList = result.get(true), dogList = result.get(false);
queryExecutor.shutdown();

关于java - 使用 Java 8 流和 CompletableFuture 的并行数据库调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37979436/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com