gpt4 book ai didi

java - 为什么不能并行减少流流?/流已经被操作或关闭

转载 作者:行者123 更新时间:2023-12-03 22:54:13 26 4
gpt4 key购买 nike

上下文

我偶然发现了一个相当烦人的问题:我有一个程序,它有很多数据源,能够流式传输相同类型的元素,我想“映射”程序中的每个可用元素(元素顺序没关系)。

因此我尝试减少我的 Stream<Stream<T>> streamOfStreamOfT;变成一个简单的Stream<T> streamOfT;使用 streamOfT = streamOfStreamOfT.reduce(Stream.empty(), Stream::concat);

因为元素顺序对我来说并不重要,所以我尝试使用 .parallel() 并行化 reduce 操作。 : streamOfT = streamOfStreamOfT.parallel().reduce(Stream.empty(), Stream::concat);但这会触发 java.lang.IllegalStateException: stream has already been operated upon or closed

例子

要亲自体验,只需通过评论/取消评论 .parallel() 来玩以下主要 (java 1.8u20)

public static void main(String[] args) {
// GIVEN
List<Stream<Integer>> listOfStreamOfInts = new ArrayList<>();
for (int j = 0; j < 10; j++) {
IntStream intStreamOf10Ints = IntStream.iterate(0, i -> i + 1)
.limit(10);
Stream<Integer> genericStreamOf10Ints = StreamSupport.stream(
intStreamOf10Ints.spliterator(), true);
listOfStreamOfInts.add(genericStreamOf10Ints);
}
Stream<Stream<Integer>> streamOfStreamOfInts = listOfStreamOfInts
.stream();
// WHEN
Stream<Integer> streamOfInts = streamOfStreamOfInts
// ////////////////
// PROBLEM
// |
// V
.parallel()
.reduce(Stream.empty(), Stream::concat);

// THEN
System.out.println(streamOfInts.map(String::valueOf).collect(
joining(", ")));
}

问题

有人可以解释这个限制吗? /找到一种更好的方法来处理流的并行减少


编辑 1

在@Smutje 和@LouisWasserman 评论之后,似乎 .flatMap(Function.identity())是容忍 .parallel() 的更好选择流

最佳答案

您使用的reduce 形式采用标识 和关联组合函数。但是 Stream.empty() 不是一个值;它有状态。流不是数组或集合之类的数据结构;它们是通过可能并行的聚合操作推送数据的载体,并且它们具有某种状态(例如流是否已被消耗)。想想这是如何工作的;您将构建一棵树,其中相同的“空”流出现在多个叶子中。当您尝试两次使用此有状态的非身份(不会按顺序发生,但会并行发生)时,您第二次尝试遍历该空流时,将非常正确地看到它已被使用.

所以问题是,您只是错误地使用了这个 reduce 方法。问题不在于并行性;只是并行性暴露了潜在的问题。

其次,即使这按照您认为应该的方式“工作”,您也只能并行构建表示扁平流的树;当您进行连接时,那里是一个顺序流管道。糟糕。

第三,即使这按照您认为应该的方式“工作”,您将通过构建串联流来增加大量元素访问开销,并且您不会获得并行性的好处你在寻找。

简单的答案是展平流:

String joined = streamOfStreams.parallel()
.flatMap(s -> s)
.collect(joining(", "));

关于java - 为什么不能并行减少流流?/流已经被操作或关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25412377/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com