gpt4 book ai didi

java - StreamEx.parallel().forEach() 在 .map() 之后不并行运行

转载 作者:塔克拉玛干 更新时间:2023-11-03 04:19:45 25 4
gpt4 key购买 nike

我注意到,如果我使用 StreamEx 库通过自定义 ForkJoinPool 并行处理我的流,如下所示 - 后续操作会在该池的并行线程中运行。但是,如果我添加一个 map() 操作并并行生成流 - 仅使用池中的一个线程。

下面是演示此问题的最小工作示例的完整代码(没有所有导入)。 executeAsParallelFromList() 和 executeAsParallelAfterMap() 方法之间的唯一区别是在 .parallel() 之前添加了 .map(...) 调用。

import one.util.streamex.StreamEx;

public class ParallelExample {

private static final Logger logger = LoggerFactory.getLogger(ParallelExample.class);
private static ForkJoinPool s3ThreadPool = new ForkJoinPool(3);

public static List<String> getTestList(){
int listSize = 10;
List<String> testList = new ArrayList<>();
for (int i=0; i<listSize; i++)
testList.add("item_" + i);
return testList;
}

public static void executeAsParallelFromList(){
logger.info("executeAsParallelFromList():");
List<String> testList = getTestList();
StreamEx<String> streamOfItems = StreamEx
.of(testList)
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}

public static void executeAsParallelAfterMap(){
logger.info("executeAsParallelAfterMap():");
List<String> testList = getTestList();
StreamEx<String> streamOfItems = StreamEx
.of(testList)
.map(item -> item+"_mapped")
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}

private static void handleItem(String item){
// do something with the item - just print for now
logger.info("I'm handling item: {}", item);
}

}

执行两种方法的单元测试:

public class ParallelExampleTest {

@Test
public void testExecuteAsParallelFromList() {
ParallelExample.executeAsParallelFromList();
}

@Test
public void testExecuteAsParallelFromStreamEx() {
ParallelExample.executeAsParallelAfterMap();
}

}

执行结果:

08:49:12.992 [main] INFO  marina.streams.ParallelExample - executeAsParallelFromList():
08:49:13.002 [main] INFO marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.040 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_6
08:49:13.040 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_2
08:49:13.040 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_1
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_4
08:49:13.041 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_8
08:49:13.041 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_0
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_3
08:49:13.041 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_9
08:49:13.041 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_5
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_7

08:49:13.043 [main] INFO marina.streams.ParallelExample - executeAsParallelAfterMap():
08:49:13.043 [main] INFO marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_0_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_1_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_2_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_3_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_4_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_5_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_6_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_7_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_8_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_9_mapped

如您所见,执行 executeAsParallelFromList() 时使用了所有三个线程,但执行 executeAsParallelAfterMap() 时仅使用了一个线程。

为什么?

谢谢!

jetty

注意:这个例子是故意简单化的——为了演示这个问题,我试图让它尽可能简单。显然,在现实生活中,map()、handleItem() 等中发生的事情要多得多,输入数据也更有趣(我正在尝试并行处理 AWS S3 存储桶/前缀)。

最佳答案

简单的回答:这是一个错误。我 filedfixed它。这被测试忽略了,因为测试只检查所有操作是否在指定的池中执行,但不检查是否使用了池的不同线程(有时如果并行化不起作用也没关系,例如对于一个元素的流).

0.6.4 版本中提供了一个修复程序。在以前的版本中,您可以考虑使用 .parallel().parallel(fjp) 解决该问题:它应该正确并行化。

请考虑向官方 StreamEx 报告 StreamEx 问题 issue tracker .这些天我只是偶尔访问StackOverflow,所以可能会忽略这里报告的问题。

关于java - StreamEx.parallel().forEach() 在 .map() 之后不并行运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40177377/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com