gpt4 book ai didi

Java Spliterator 不断拆分并行流

转载 作者:行者123 更新时间:2023-12-03 14:20:51 26 4
gpt4 key购买 nike

我发现 Java 并行流有一些令人惊讶的行为。我自己制作了Spliterator ,并且生成的并行流被分割,直到每个流中只有一个元素。这似乎太小了,我想知道我做错了什么。我希望我可以设置一些特征来纠正这个问题。
这是我的测试代码。 Float这只是一个虚拟的有效载荷,我真正的流类要复杂一些。

   public static void main( String[] args ) {
TestingSpliterator splits = new TestingSpliterator( 10 );
Stream<Float> test = StreamSupport.stream( splits, true );
double total = test.mapToDouble( Float::doubleValue ).sum();
System.out.println( "Total: " + total );
}
此代码将不断拆分此流,直到每个 Spliterator只有一个元素。这似乎太高效了。
输出:
run:
Split on count: 10
Split on count: 5
Split on count: 3
Split on count: 5
Split on count: 2
Split on count: 2
Split on count: 3
Split on count: 2
Split on count: 2
Total: 5.164293184876442
BUILD SUCCESSFUL (total time: 0 seconds)
这是 Spliterator 的代码.我主要关心的是我应该使用什么特性,但也许其他地方有问题?
public class TestingSpliterator implements Spliterator<Float> {
int count;
int splits;

public TestingSpliterator( int count ) {
this.count = count;
}

@Override
public boolean tryAdvance( Consumer<? super Float> cnsmr ) {
if( count > 0 ) {
cnsmr.accept( (float)Math.random() );
count--;
return true;
} else
return false;
}

@Override
public Spliterator<Float> trySplit() {
System.err.println( "Split on count: " + count );
if( count > 1 ) {
splits++;
int half = count / 2;
TestingSpliterator newSplit = new TestingSpliterator( count - half );
count = half;
return newSplit;
} else
return null;
}

@Override
public long estimateSize() {
return count;
}

@Override
public int characteristics() {
return IMMUTABLE | SIZED;
}
}
那么我怎样才能让流被分成更大的 block 呢?我希望在 10,000 到 50,000 附近会更好。
我知道我可以返回 null来自 trySplit()方法,但这似乎是一种倒退的方式。系统似乎应该对核心数量、当前负载以及使用流的代码的复杂程度有一些概念,并相应地进行自我调整。换句话说,我希望流 block 大小是外部配置的,而不是由流本身在内部固定的。
编辑:重新。 Holger 在下面的回答是,当我增加原始流中的元素数量时,流拆分会少一些,所以 StreamSupport最终会停止 split 。
在 100 个元素的初始流大小时, StreamSupport当流大小达到 2 时停止拆分(我在屏幕上看到的最后一行是 Split on count: 4 )。
对于 1000 个元素的初始流大小,各个流 block 的最终大小约为 32 个元素。

编辑部分 deux:查看上面的输出后,我更改了代码以列出单个 Spliterator s 创建。以下是更改:
   public static void main( String[] args ) {
TestingSpliterator splits = new TestingSpliterator( 100 );
Stream<Float> test = StreamSupport.stream( splits, true );
double total = test.mapToDouble( Float::doubleValue ).sum();
System.out.println( "Total Spliterators: " + testers.size() );
for( TestingSpliterator t : testers ) {
System.out.println( "Splits: " + t.splits );
}
}
TestingSpliterator的 Actor :
   static Queue<TestingSpliterator> testers = new ConcurrentLinkedQueue<>();

public TestingSpliterator( int count ) {
this.count = count;
testers.add( this ); // OUCH! 'this' escape
}
这段代码的结果是第一个 Spliterator被拆分 5 次。下一个 Spliterator被拆分 4 次。下一组 Spliterators split 3次。等等。结果是 36 Spliterators制作完成,流被分成尽可能多的部分。在典型的桌面系统上,这似乎是 API 认为最适合并行操作的方式。
我将在下面接受 Holger 的回答,本质上是 StreamSupport类(class)正在做正确的事,不要担心,要快乐。对我来说,部分问题是我正在对非常小的流大小进行早期测试,我对拆分的数量感到惊讶。不要自己犯同样的错误。

最佳答案

你从错误的角度看它。该实现没有拆分“直到每个拆分器都有一个元素”,而是拆分“直到有十个拆分器”。
单个拆分器实例只能由一个线程处理。拆分器在开始遍历后不需要支持拆分。因此,任何事先未使用的拆分机会都可能导致之后的并行处理能力受限。
请务必记住,Stream 实现收到了 ToDoubleFunction。工作量未知¹。不知道就这么简单Float::doubleValue在你的情况下。它可能是一个需要花一分钟时间评估的函数,然后每个 CPU 核心都有一个分离器是正确的。即使拥有多个 CPU 内核也是一种有效的策略,可以处理某些评估花费的时间明显长于其他评估的可能性。
初始拆分器的典型数量将是“CPU 核心数”× 4,尽管稍后当更多关于实际工作负载的知识存在时,这里可能会进行更多拆分操作。当您的输入数据少于该数字时,将其拆分直到每个拆分器只剩下一个元素就不足为奇了。
你可以试试new TestingSpliterator( 10000 )1000100一旦实现假设有足够的 block 来保持所有 CPU 内核忙碌,就会看到拆分的数量不会发生显着变化。
由于您的拆分器也不了解消费流的每个元素的工作负载,因此您不必担心这一点。如果您可以顺利地支持拆分为单个元素,那就这样做吧。
¹ 但是,对于没有操作被链接的情况,它没有特殊的优化。

关于Java Spliterator 不断拆分并行流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66699742/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com