gpt4 book ai didi

java - 并行转换流时如何使用收集器

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:19:34 26 4
gpt4 key购买 nike

我实际上试图回答这个问题How to skip even lines of a Stream<String> obtained from the Files.lines .所以我认为这个收集器不能很好地并行工作:

private static Collector<String, ?, List<String>> oddLines() {
int[] counter = {1};
return Collector.of(ArrayList::new,
(l, line) -> {
if (counter[0] % 2 == 1) l.add(line);
counter[0]++;
},
(l1, l2) -> {
l1.addAll(l2);
return l1;
});
}

但它有效。

编辑:它实际上没有用;我被我的输入集太小而无法触发任何并行性这一事实所愚弄;查看评论中的讨论

我认为这行不通,因为我想到了以下两个执行计划。


1。 counter 数组在所有线程之间共享。

线程 t1 读取了 Stream 的第一个元素,因此满足 if 条件。它将第一个元素添加到它的列表中。然后在他有时间更新数组值之前执行停止。

线程 t2,表示从流的第 4 个元素开始,将其添加到其列表中。所以我们最终得到了一个不需要的元素。

当然,既然这个收集器看起来可以工作,我猜它不会那样工作。无论如何更新都不是原子的。


2。每个线程都有自己的数组副本

在这种情况下,更新没有更多问题,但没有什么能阻止我线程 t2 不会从流的第 4 个元素开始。所以他也不是那样工作的。


所以它似乎根本不是那样工作的,这让我想到了一个问题......收集器是如何并行使用的?

有人能基本解释一下它是如何工作的,以及为什么我的收集器在并行运行时工作吗?

非常感谢!

最佳答案

parallel() 源流传递到您的收集器中足以破坏逻辑,因为您的共享状态(计数器)可能会增加来自不同的任务。您可以验证这一点,因为它永远不会为任何有限流输入返回正确的结果:

    Stream<String> lines = IntStream.range(1, 20000).mapToObj(i -> i + "");
System.out.println(lines.isParallel());
lines = lines.parallel();
System.out.println(lines.isParallel());

List<String> collected = lines.collect(oddLines());

System.out.println(collected.size());

请注意,对于无限流(例如,当从 Files.lines() 中读取时),您需要在流中生成大量数据,因此它实际上是 fork 的同时运行一些 block 的任务。

我的输出是:

false
true
12386

这显然是错误的。


正如 @Holger 在评论中正确指出的那样,当您的收集器指定 CONCURRENTUNORDERED 时,可能会发生不同的竞争,在这种情况下,它们运行在跨任务的单个共享集合(ArrayList::new 每个流调用一次),其中 - 仅使用 parallel() 它将在集合 上运行累加器每个任务,然后使用您定义的组合器组合结果。

如果将特征添加到收集器,由于单个集合中的共享状态,您可能会遇到以下结果:

false
true
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 73
at java.util.ArrayList.add(ArrayList.java:459)
at de.jungblut.stuff.StreamPallel.lambda$0(StreamPallel.java:18)
at de.jungblut.stuff.StreamPallel$$Lambda$3/1044036744.accept(Unknown Source)
at java.util.stream.ReferencePipeline.lambda$collect$207(ReferencePipeline.java:496)
at java.util.stream.ReferencePipeline$$Lambda$6/2003749087.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:496)
at de.jungblut.stuff.StreamPallel.main(StreamPallel.java:32)12386

关于java - 并行转换流时如何使用收集器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30171322/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com