gpt4 book ai didi

java - 了解 Stream API ForEach Task 中的主循环

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:02:03 24 4
gpt4 key购买 nike

似乎 Java Streams 并行化的核心是 ForEachTask。理解其逻辑似乎对于获得必要的心智模型至关重要,该心智模型可以预测针对 Streams API 编写的客户端代码的并发行为。然而,我发现我的预期与实际行为相矛盾。

作为引用,这里是关键的 compute() 方法 (java/util/streams/ForEachOps.java:253):

public void compute() {
Spliterator<S> rightSplit = spliterator, leftSplit;
long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
if ((sizeThreshold = targetSize) == 0L)
targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
boolean forkRight = false;
Sink<S> taskSink = sink;
ForEachTask<S, T> task = this;
while (!isShortCircuit || !taskSink.cancellationRequested()) {
if (sizeEstimate <= sizeThreshold ||
(leftSplit = rightSplit.trySplit()) == null) {
task.helper.copyInto(taskSink, rightSplit);
break;
}
ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
task.addToPendingCount(1);
ForEachTask<S, T> taskToFork;
if (forkRight) {
forkRight = false;
rightSplit = leftSplit;
taskToFork = task;
task = leftTask;
}
else {
forkRight = true;
taskToFork = leftTask;
}
taskToFork.fork();
sizeEstimate = rightSplit.estimateSize();
}
task.spliterator = null;
task.propagateCompletion();
}

在较高级别的描述中,主循环不断分解拆分器,交替 fork block 的处理并内联处理它,直到拆分器拒绝进一步拆分或剩余大小低于计算的阈值。

现在考虑上述算法在未确定大小的流的情况下,其中整体没有被分成大致相等的两半;相反,从流的头部重复取出预定大小的 block 。在这种情况下, block 的“建议目标大小”异常大,这基本上意味着 block 永远不会重新拆分为更小的 block 。

因此,该算法似乎交替地 fork 出一个 block ,然后处理一个内联 block 。如果每个 block 花费相同的时间来处理,这应该会导致使用不超过两个内核。然而,实际行为是我机器上的所有四个内核都被占用了。显然,我遗漏了该算法的一个重要拼图。

我缺少什么?


附录:测试代码

这是一段独立的代码,可用于测试作为该问题主题的行为:

package test;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static test.FixedBatchSpliteratorWrapper.withFixedSplits;

import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class Parallelization {
static final AtomicLong totalTime = new AtomicLong();
static final ExecutorService pool = Executors.newFixedThreadPool(4);

public static void main(String[] args) throws IOException {
final long start = System.nanoTime();
final Path inputPath = createInput();
System.out.println("Start processing");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) {
withFixedSplits(Files.newBufferedReader(inputPath).lines(), 200).map(Parallelization::processLine)
.forEach(w::println);
}
final double cpuTime = totalTime.get(), realTime = System.nanoTime() - start;
final int cores = Runtime.getRuntime().availableProcessors();
System.out.println(" Cores: " + cores);
System.out.format(" CPU time: %.2f s\n", cpuTime / SECONDS.toNanos(1));
System.out.format(" Real time: %.2f s\n", realTime / SECONDS.toNanos(1));
System.out.format("CPU utilization: %.2f%%", 100.0 * cpuTime / realTime / cores);
}
private static String processLine(String line) {
final long localStart = System.nanoTime();
double ret = 0;
for (int i = 0; i < line.length(); i++)
for (int j = 0; j < line.length(); j++)
ret += Math.pow(line.charAt(i), line.charAt(j) / 32.0);
final long took = System.nanoTime() - localStart;
totalTime.getAndAdd(took);
return NANOSECONDS.toMillis(took) + " " + ret;
}
private static Path createInput() throws IOException {
final Path inputPath = Paths.get("input.txt");
try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) {
for (int i = 0; i < 6_000; i++) {
final String text = String.valueOf(System.nanoTime());
for (int j = 0; j < 20; j++)
w.print(text);
w.println();
}
}
return inputPath;
}
}

package test;

import static java.util.Spliterators.spliterator;
import static java.util.stream.StreamSupport.stream;

import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> {
private final Spliterator<T> spliterator;
private final int batchSize;
private final int characteristics;
private long est;

public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, long est, int batchSize) {
final int c = toWrap.characteristics();
this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c;
this.spliterator = toWrap;
this.batchSize = batchSize;
this.est = est;
}
public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap, int batchSize) {
this(toWrap, toWrap.estimateSize(), batchSize);
}

public static <T> Stream<T> withFixedSplits(Stream<T> in, int batchSize) {
return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(), batchSize), true);
}

@Override public Spliterator<T> trySplit() {
final HoldingConsumer<T> holder = new HoldingConsumer<>();
if (!spliterator.tryAdvance(holder)) return null;
final Object[] a = new Object[batchSize];
int j = 0;
do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder));
if (est != Long.MAX_VALUE) est -= j;
return spliterator(a, 0, j, characteristics());
}
@Override public boolean tryAdvance(Consumer<? super T> action) {
return spliterator.tryAdvance(action);
}
@Override public void forEachRemaining(Consumer<? super T> action) {
spliterator.forEachRemaining(action);
}
@Override public Comparator<? super T> getComparator() {
if (hasCharacteristics(SORTED)) return null;
throw new IllegalStateException();
}
@Override public long estimateSize() { return est; }
@Override public int characteristics() { return characteristics; }

static final class HoldingConsumer<T> implements Consumer<T> {
Object value;
@Override public void accept(T value) { this.value = value; }
}
}

最佳答案

具有讽刺意味的是,答案几乎已在问题中陈述:由于“左”和“右”任务轮流 fork 与内联处理,一半时间是正确的任务,由this,例如流的其余部分正在 fork 。这意味着 block 的 fork 只是稍微慢了一点(每隔一段时间发生一次),但很明显它发生了。

关于java - 了解 Stream API ForEach Task 中的主循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23196461/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com