gpt4 book ai didi

java - 并行流调用 Spliterator 的次数超过其限制

转载 作者:塔克拉玛干 更新时间:2023-11-01 23:02:44 26 4
gpt4 key购买 nike

我最近发现了一个错误

StreamSupport.intStream(/* a Spliterator.ofInt */, true)
.limit(20)

调用 Spliterator.ofInt.tryAdvance 超过 20 次。当我把它改成

StreamSupport.intStream(/* a Spliterator.ofInt */, true)
.sequential()
.limit(20)

问题消失了。为什么会这样?当 tryAdvance 有副作用时,除了在 Spliterator 中构建一个副作用之外,是否有任何方法可以对并行流实现严格限制? (这是为了测试一些返回无限流的方法,但测试需要在没有复杂的“X 毫秒循环”构造的情况下到达最终终点。)

最佳答案

关于 limittrySplit 应该如何交互似乎存在根本性的误解。假设 trySplit 调用不应超过指定的 limit,这是完全错误的。

trySplit 的目的是将源数据分成两部分,在最好的情况下分成两部分一半,因为trySplit 是假设的尝试平衡拆分。因此,如果您有一个包含 100 万个元素的源数据集,成功拆分会产生两个各有 50 万个元素的源数据集。这与您可能已应用于流的 limit(20) 完全无关,除非我们事先知道,如果拆分器具有 SIZED,我们可以删除第二个数据集|SUBSIZED 特性,因为请求的 二十个元素只能在前五十万内找到。

很容易计算出 在最好的情况下,即平衡拆分,我们已经需要十五次拆分操作,每次都删除上半部分,然后才能在第一个之间进行拆分允许我们并行处理前二十个元素的二十个元素。

这很容易证明:

class DebugSpliterator extends Spliterators.AbstractIntSpliterator {
int current, fence;
DebugSpliterator() {
this(0, 1_000_000);
}
DebugSpliterator(int start, int end) {
super(end-start, ORDERED|SIZED|SUBSIZED);
current = start;
fence = end;
}
@Override public boolean tryAdvance(IntConsumer action) {
if(current<fence) {
action.accept(current++);
return true;
}
return false;
}
@Override public OfInt trySplit() {
int mid = (current+fence)>>>1;
System.out.println("trySplit() ["+current+", "+mid+", "+fence+"]");
return mid>current? new DebugSpliterator(current, current=mid): null;
}
}
StreamSupport.stream(new DebugSpliterator(), true)
.limit(20)
.forEach(x -> {});

在我的机器上,它打印:

trySplit() [0, 500000, 1000000]
trySplit() [0, 250000, 500000]
trySplit() [0, 125000, 250000]
trySplit() [0, 62500, 125000]
trySplit() [0, 31250, 62500]
trySplit() [0, 15625, 31250]
trySplit() [0, 7812, 15625]
trySplit() [0, 3906, 7812]
trySplit() [0, 1953, 3906]
trySplit() [0, 976, 1953]
trySplit() [0, 488, 976]
trySplit() [0, 244, 488]
trySplit() [0, 122, 244]
trySplit() [0, 61, 122]
trySplit() [0, 30, 61]
trySplit() [0, 15, 30]
trySplit() [15, 22, 30]
trySplit() [15, 18, 22]
trySplit() [15, 16, 18]
trySplit() [16, 17, 18]
trySplit() [0, 7, 15]
trySplit() [18, 20, 22]
trySplit() [18, 19, 20]
trySplit() [7, 11, 15]
trySplit() [0, 3, 7]
trySplit() [3, 5, 7]
trySplit() [3, 4, 5]
trySplit() [7, 9, 11]
trySplit() [4, 4, 5]
trySplit() [9, 10, 11]
trySplit() [11, 13, 15]
trySplit() [0, 1, 3]
trySplit() [13, 14, 15]
trySplit() [7, 8, 9]
trySplit() [1, 2, 3]
trySplit() [8, 8, 9]
trySplit() [5, 6, 7]
trySplit() [14, 14, 15]
trySplit() [17, 17, 18]
trySplit() [11, 12, 13]
trySplit() [12, 12, 13]
trySplit() [2, 2, 3]
trySplit() [10, 10, 11]
trySplit() [6, 6, 7]

当然,这远远超过 20 次拆分尝试,但完全合理,因为必须拆分数据集,直到我们在所需目标范围内有子范围才能并行处理它。

我们可以通过删除导致此执行策略的元信息来强制执行不同的行为:

StreamSupport.stream(new DebugSpliterator(), true)
.filter(x -> true)
.limit(20)
.forEach(x -> {});

由于 Stream API 不知道谓词的行为,管道失去了它的 SIZED 特性,导致了

trySplit() [0, 500000, 1000000]
trySplit() [500000, 750000, 1000000]
trySplit() [500000, 625000, 750000]
trySplit() [625000, 687500, 750000]
trySplit() [625000, 656250, 687500]
trySplit() [656250, 671875, 687500]
trySplit() [0, 250000, 500000]
trySplit() [750000, 875000, 1000000]
trySplit() [250000, 375000, 500000]
trySplit() [0, 125000, 250000]
trySplit() [250000, 312500, 375000]
trySplit() [312500, 343750, 375000]
trySplit() [125000, 187500, 250000]
trySplit() [875000, 937500, 1000000]
trySplit() [375000, 437500, 500000]
trySplit() [125000, 156250, 187500]
trySplit() [250000, 281250, 312500]
trySplit() [750000, 812500, 875000]
trySplit() [281250, 296875, 312500]
trySplit() [156250, 171875, 187500]
trySplit() [437500, 468750, 500000]
trySplit() [0, 62500, 125000]
trySplit() [875000, 906250, 937500]
trySplit() [62500, 93750, 125000]
trySplit() [812500, 843750, 875000]
trySplit() [906250, 921875, 937500]
trySplit() [0, 31250, 62500]
trySplit() [31250, 46875, 62500]
trySplit() [46875, 54687, 62500]
trySplit() [54687, 58593, 62500]
trySplit() [58593, 60546, 62500]
trySplit() [60546, 61523, 62500]
trySplit() [61523, 62011, 62500]
trySplit() [62011, 62255, 62500]

显示较少的 trySplit 调用,但不是改进;查看数字表明现在超出结果元素范围的范围(如果我们使用我们的知识所有元素都将通过过滤器)被处理,更糟糕的是,结果元素的范围完全被单个拆分器覆盖,导致没有并行在处理我们的结果元素时,所有其他线程都在处理随后被丢弃的元素。

当然,我们可以很容易地通过改变

int mid = (current+fence)>>>1;

int mid = fence>20? 20: (current+fence)>>>1;

所以

StreamSupport.stream(new DebugSpliterator(), true)
.limit(20)
.forEach(x -> {});

结果

trySplit() [0, 20, 1000000]
trySplit() [0, 10, 20]
trySplit() [10, 15, 20]
trySplit() [10, 12, 15]
trySplit() [12, 13, 15]
trySplit() [0, 5, 10]
trySplit() [15, 17, 20]
trySplit() [5, 7, 10]
trySplit() [0, 2, 5]
trySplit() [17, 18, 20]
trySplit() [2, 3, 5]
trySplit() [5, 6, 7]
trySplit() [15, 16, 17]
trySplit() [6, 6, 7]
trySplit() [16, 16, 17]
trySplit() [0, 1, 2]
trySplit() [7, 8, 10]
trySplit() [8, 9, 10]
trySplit() [1, 1, 2]
trySplit() [3, 4, 5]
trySplit() [9, 9, 10]
trySplit() [18, 19, 20]
trySplit() [10, 11, 12]
trySplit() [13, 14, 15]
trySplit() [11, 11, 12]
trySplit() [4, 4, 5]
trySplit() [14, 14, 15]

但这不是一个通用的拆分器,而是一个如果限制不是二十个则性能很差的拆分器。

如果我们可以将限制合并到拆分器中,或者更一般地说,合并到流源中,我们就不会有这个问题。因此,您可以调用 list.subList(0, Math.min(x, list.size())).stream() 而不是 list.stream().limit(x) ,而不是 random.ints().limit(x),使用 random.ints(x),而不是 Stream.generate(generator ).limit(x) 你可以使用 LongStream.range(0, x).mapToObj( index -> generator.get()) 或者使用 this answer 的工厂方法.

对于任意流源/拆分器,应用 limit 对于并行流来说可能非常昂贵,即 even documented .好吧,在 trySplit 中产生副作用从一开始就是个坏主意。

关于java - 并行流调用 Spliterator 的次数超过其限制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46535831/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com