gpt4 book ai didi

java - 有没有办法强制 parallelStream() 并行?

转载 作者:塔克拉玛干 更新时间:2023-11-03 04:50:13 27 4
gpt4 key购买 nike

如果输入大小太小,库 automatically serializes the execution of the maps in the stream ,但这种自动化没有也不能考虑 map 操作的繁重程度。有没有办法强制 parallelStream() 真正并行化 CPU heavy 映射?

最佳答案

似乎存在根本性的误解。链接的问答讨论了流显然不能并行工作,因为 OP 没有看到预期的加速。结论是,如果工作负载太小,并行处理没有任何好处不会自动回退到顺序执行。

其实恰恰相反。如果你请求并行,你就会得到并行,即使它实际上降低了性能。在这种情况下,实现不会切换到可能更高效的顺序执行。

因此,如果您确信每个元素的工作负载足够高以证明使用并行执行是合理的,而不管元素的数量很少,您可以简单地请求并行执行。

可以很容易地证明:

Stream.of(1, 2).parallel()
.peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))
.forEach(System.out::println);

On Ideone , 它打印

processing 2 in Thread[main,5,main]
2
processing 1 in Thread[ForkJoinPool.commonPool-worker-1,5,main]
1

但消息的顺序和详细信息可能会有所不同。甚至有可能在某些环境中,这两个任务可能碰巧由同一​​个线程执行,如果它可以在另一个线程开始接收它之前确定第二个任务。但当然,如果任务足够昂贵,就不会发生这种情况。重要的一点是,整体工作负载已被拆分并入队,以便有可能被其他工作线程接收。

对于上面的简单示例,如果在您的环境中发生单线程执行,您可以像这样插入模拟工作负载:

Stream.of(1, 2).parallel()
.peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))
.map(x -> {
LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(3));
return x;
})
.forEach(System.out::println);

然后,您可能还会看到,如果“每个元素的处理时间”足够长。


更新:误解可能是由 Brian Goetz 的误导性陈述引起的:“在您的情况下,您的输入集太小而无法分解”。

必须强调的是,这不是 Stream API 的一般属性,而是 Map已被使用。 HashMap有一个后备数组,条目根据它们的哈希码分布在该数组中。将数组拆分为 n 范围可能不会导致包含元素的平衡拆分,尤其是当只有两个时。 HashMap 的实现者的 Spliterator认为在数组中搜索元素以获得完美平衡的拆分过于昂贵,并不是说拆分两个元素不值得。

HashMap的默认容量是16而这个例子只有两个元素,我们可以说 map 太大了。简单地解决这个问题也会解决这个例子:

long start = System.nanoTime();

Map<String, Supplier<String>> input = new HashMap<>(2);
input.put("1", () -> {
System.out.println(Thread.currentThread());
LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(2));
return "a";
});
input.put("2", () -> {
System.out.println(Thread.currentThread());
LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(2));
return "b";
});
Map<String, String> results = input.keySet()
.parallelStream().collect(Collectors.toConcurrentMap(
key -> key,
key -> input.get(key).get()));

System.out.println("Time: " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime()- start));

在我的机器上,它打印

Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Time: 2058

结论是 Stream 实现总是尝试使用并行执行,如果您请求它,无论输入大小如何。但这取决于输入的结构如何将工作负载分配给工作线程。事情可能更糟,例如如果您从文件流式传输行。

如果您认为平衡拆分的好处值得复制步骤的成本,您也可以使用 new ArrayList<>(input.keySet()).parallelStream()而不是 input.keySet().parallelStream() , 作为元素在 ArrayList 中的分布始终允许完美平衡的拆分。

关于java - 有没有办法强制 parallelStream() 并行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44800027/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com