gpt4 book ai didi

java - reduce 操作中并行流的同步问题

转载 作者:行者123 更新时间:2023-12-04 20:16:20 24 4
gpt4 key购买 nike

我正在尝试使用并行流连接字符串。

StringBuffer concat = Arrays.stream(grades)
.parallel()
.reduce(
new StringBuffer(),
(sb, s) -> sb.append(s),
(sb1, sb2) -> sb1.append(sb2)
);

尽管使用收集器(Mutable reduce)会是更好的方法。我想知道为什么这没有返回正确的结果。

例如,List<String> grades = List.of("A", "B");

虽然此管道的非并行版本工作正常。我看到的结果是 BABA , 虽然它应该只是 AB .

我已经在使用 StringBuffer这是线程安全的,而不是 StringBuilder .

我也发现以下代码存在同样的问题。

List<Integer> ages = people
.stream()
.parallel()
.reduce(
Collections.synchronizedList(new ArrayList<>()),
(list, p) -> { list.add(p.getAge()); return list; },
(list1, list2) -> { list1.addAll(list2) ; return list1; }
);

在这里,我也使用了一个同步集合,所有的方法都是线程安全的。

我在 Java docs 上看到了这个

However, there are good reasons to prefer a reduce operation over a mutative accumulation such as the above. Not only is a reduction "more abstract" -- it operates on the stream as a whole rather than individual elements -- but a properly constructed reduce operation is inherently parallelizable, so long as the function(s) used to process the elements are associative and stateless. For example, given a stream of numbers for which we want to find the sum, we can write:

int sum = numbers.stream().reduce(0, (x,y) -> x+y);   or:

int sum = numbers.stream().reduce(0, Integer::sum); These reduction operations can run safely in parallel with almost no

modification:

int sum = numbers.parallelStream().reduce(0, Integer::sum);   Reduction parallellizes well because the implementation can operate on

subsets of the data in parallel, and then combine the intermediate results to get the final correct answer. (Even if the language had a "parallel for-each" construct, the mutative accumulation approach would still required the developer to provide thread-safe updates to the shared accumulating variable sum, and the required synchronization would then likely eliminate any performance gain from parallelism.) Using reduce() instead removes all of the burden of parallelizing the reduction operation, and the library can provide an efficient parallel implementation with no additional synchronization required.

由此,我了解到,并行归约是很有可能的。

我是不是漏掉了什么?使用线程安全的数据结构还不够吗?

最佳答案

当您执行 new StringBuffer() 时,您正在创建对单个缓冲区的引用。当您执行 .parallel() 时,两个并行流都将传递此引用,从而在同一可变缓冲区上运行。空缓冲区首先用“B”归约,然后用“A”归约,然后归约到自身,得到“BABA”。

要对诸如 StringBuffers 之类的可变结构执行此类操作,请尝试使用 .collect() 代替:

StringBuffer concat = Arrays.stream(grades).parallel().collect(
() -> new StringBuffer(),
(sb, s) -> sb.append(s),
(sb1, sb2) -> sb1.append(sb2));

关于java - reduce 操作中并行流的同步问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58857305/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com