gpt4 book ai didi

Java 并行流在 Set 上无法正常工作

转载 作者:行者123 更新时间:2023-12-01 16:47:24 24 4
gpt4 key购买 nike

一点背景知识,我尝试使用 Java 8 并行流以异步方式调用多个 API。我希望调用每个 API,然后阻塞,直到返回所有 API。我遇到了一个有趣的情况,如果我尝试流式传输 map (而不是列表),API 将不再在新线程中被调用。

如果我运行以下代码,每个服务都会在新线程中调用:

    List<GitUser> result1 = Arrays.asList(service1, service2, service3).parallelStream()
.map(s->s.getGitUser())
.collect(Collectors.toList());

但是,如果我使用 map 来完成相同的任务,则每个服务都会同步调用:

    Map<String, ParallelStreamPOCService> map = new HashMap<>();
map.put("1", service1);
map.put("2", service2);
map.put("3", service3);

List<GitUser> result2 = map.entrySet().parallelStream()
.map(s->s.getValue().getGitUser())
.collect(Collectors.toList());

这是服务实现:

    public GitUser getGitUser() {
LOGGER.info("Loading user " + userName);
String url = String.format("https://api.github.com/users/%s", userName);
GitUser results = restTemplate.getForObject(url, GitUser.class);
try {
TimeUnit.SECONDS.sleep(secondsToSleep);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
LOGGER.error("Finished " + userName);
return results;
}

最佳答案

this answer 中所述,这是关于如何分配工作负载的实现细节。 HashMap 有一个内部支持数组,其容量比条目更高(通常)。它根据数组元素进行分割,知道这可能会造成不平衡的分割,因为确定条目如何在数组上分布可能成本很高。

最简单的解决方案是当您知道只有几个元素时,减少 HashSet 的容量(默认容量为 16):

HashMap<Integer,String> map = new HashMap<>();
map.put(0, "foo");
map.put(1, "bar");
map.put(2, "baz");

map.values().parallelStream().forEach(v -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(200));
System.out.println(v+"\t"+Thread.currentThread());
});
foo Thread[main,5,main]
bar Thread[main,5,main]
baz Thread[main,5,main]
HashMap<Integer,String> map = new HashMap<>(4);
map.put(0, "foo");
map.put(1, "bar");
map.put(2, "baz");

map.values().parallelStream().forEach(v -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(200));
System.out.println(v+"\t"+Thread.currentThread());
});
foo Thread[ForkJoinPool.commonPool-worker-1,5,main]
baz Thread[main,5,main]
bar Thread[ForkJoinPool.commonPool-worker-1,5,main]

请注意,由于舍入问题,它仍然不会为每个元素使用一个线程。如前所述,HashMapSpliterator 不知道元素如何分布在数组上。但它知道总共有三个元素,因此它估计分割后每个工作负载中有一半。三的一半四舍五入为一,因此 Stream 实现假设即使尝试进一步分割这些工作负载也没有任何好处。

除了使用具有更多元素的并行流之外,没有简单的解决方法。不过,仅出于教育目的:

HashMap<Integer,String> map = new HashMap<>(4, 1f);
map.put(0, "foo");
map.put(1, "bar");
map.put(2, "baz");
map.put(3, null);

map.values().parallelStream()
.filter(Objects::nonNull)
.forEach(v -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(200));
System.out.println(v+"\t"+Thread.currentThread());
});
bar Thread[ForkJoinPool.commonPool-worker-1,5,main]
baz Thread[main,5,main]
foo Thread[ForkJoinPool.commonPool-worker-2,5,main]

通过插入第四个元素,我们消除了舍入问题。还需要提供 1f 的负载因子,以防止 HashMap 增加容量,这会使我们回到第一个方向(除非我们至少有八个核心)。

这是一个拼凑,因为我们事先知道我们会浪费一个工作线程来检测我们的虚拟 null 条目。但它演示了工作负载分配的工作原理。

map 中包含更多元素会自动消除这些问题。

<小时/>

Stream 不适用于阻塞或 hibernate 的任务。对于此类任务,您应该使用 ExecutorService,它还允许使用比 CPU 核心更多的线程,这对于在整个执行时间内不使用 CPU 核心的任务来说是合理的。

ExecutorService es = Executors.newCachedThreadPool();
List<GitUser> result =
es.invokeAll(
Stream.of(service1, service2, service3)
.<Callable<GitUser>>map(s -> s::getGitUser)
.collect(Collectors.toList())
) .stream()
.map(future -> {
try { return future.get(); }
catch (InterruptedException|ExecutionException ex) {
throw new IllegalStateException(ex);
}
})
.collect(Collectors.toList());

关于Java 并行流在 Set 上无法正常工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47562790/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com