gpt4 book ai didi

java - Java并行流的性能影响

转载 作者:行者123 更新时间:2023-12-01 12:56:02 25 4
gpt4 key购买 nike

使用.stream().parallel()的最佳实践是什么?

例如,如果您有一堆阻塞的I / O调用,并且想要检查.anyMatch(...),则并行执行此操作似乎是明智的选择。

示例代码:

public boolean hasAnyRecentReference(JobId jobid) {
<...>
return pendingJobReferences.stream()
.parallel()
.anyMatch(pendingRef -> {
JobReference readReference = pendingRef.sync();
Duration referenceAge = timeService.timeSince(readReference.creationTime());
return referenceAge.lessThan(maxReferenceAge)
});
}

乍看之下这是明智的,因为我们可以同时执行多个阻塞读取,因为我们只关心匹配的内容,而不是一个接一个地检查(因此,如果每次读取花费50ms,我们只需要等待(50ms * ExpectedNumberOfNonRecentRefs)/ numThreads )。

在生产环境中引入此代码是否会对代码库的其他部分产生无法预料的性能影响?

最佳答案

编辑:@edharned指出,.parallel()现在使用CountedCompleter而不是调用.join(),这有其自身的问题,也由Ed在What is currently being done?部分的http://coopsoft.com/ar/Calamity2Article.html中解释过。

我相信下面的信息对于理解为什么fork-join框架是棘手的,并且结论中提议的.parallel()的替代方案仍然有意义仍然有用。

尽管代码的精神是正确的,但实际的代码可能会对使用.parallel()的所有代码产生系统范围的影响,即使这一点并不明显。

前一段时间,我发现了一篇不建议这样做的文章:https://dzone.com/articles/think-twice-using-java-8,但是直到最近我才更深入地研究。

经过一堆阅读后,这些是我的想法:

  • Java中的.parallel()使用ForkJoinPool.commonPool(),这是所有流共享的单例ForkJoinPool(ForkJoinPool.commonPool()是公共静态方法,因此从理论上讲,其他库/部分代码可以使用它)
  • ForkJoinPool实现工作窃取,除了共享队列外,还具有每个线程的队列
  • 工作窃取意味着当线程空闲时,它将寻找更多工作来做
  • 最初,我认为:按照这个定义,cached线程池也不会窃取工作(即使某些引用称它为缓存的线程池进行工作共享)吗?
  • 事实证明,在使用单词idle时似乎存在一些术语模糊性:
  • cached线程池中,线程只有在完成其任务后才处于空闲状态。如果它在等待阻塞调用
  • 时被阻塞,则 不会变为空闲
  • forkjoin线程池中,线程在完成其任务时或在子任务上调用.join()方法(这是一个特殊的阻塞调用)时都是空闲的。

    在子任务上调用.join()时,线程在等待该子任务完成时变为空闲状态。空闲时,即使它在另一个线程的队列中(它将窃取工作),它也会尝试执行任何其他可用任务。

    [这是重要的位] 一旦找到另一个要执行的任务,即使线程等待执行的子任务在线程仍在执行被盗任务的同时完成,它也必须先完成它才能恢复其原始执行。

    [这也很重要] 此工作窃取行为仅适用于调用.join()的线程。如果某个线程在诸如I / O之类的其他事物上阻塞,则它将变为空闲状态(即,它不会窃取工作)。
  • Java流不允许您提供自定义的ForkJoinPool,但是https://github.com/amaembo/streamex可以提供

  • 我花了一些时间来了解 2.3.2的含义,因此我将举一个简单的示例来帮助说明问题:

    注意:这些是虚拟的示例,但是您可以进入等价情况,而无需使用内部实现派生联接内容的流来意识到它。

    另外,我将使用极其简化的伪代码,仅用于说明.parallel()问题,但在其他方面不一定有意义。

    假设我们正在实施合并排序
    merge_sort(list):
    left, right = split(list)

    leftTask = mergeSortTask(left).fork()
    rightTask = mergeSortTaks(right).fork()

    return merge(leftTask.join(), rightTask.join())

    现在,我们还有另一段执行以下操作的代码:
    dummy_collect_results(queriesIds):
    pending_results = []

    for id in queriesIds:
    pending_results += longBlockingIOTask(id).fork()

    // do more stuff

    这里会发生什么?

    编写合并排序代码时,您认为排序调用不会执行任何I / O,因此它们的性能应该是确定性的,对吗?

    对。您可能不会想到的是,由于 dummy_collect_results方法创建了许多长时间运行且阻塞的子任务,当执行mergesort任务的线程在 .join()上阻塞时,等待子任务的完成,它们可能会开始执行长时间阻塞的子任务之一。

    这是不好的,因为如上所述,一旦长阻塞(在I / O上,不是 .join()调用,因此线程不会再次变为空闲状态)被盗,无论子任务是否存在,都必须完成它阻止I / O时,一个线程正在等待通过 .join()完成。

    这使得mergesort任务的执行不再是确定性的,因为执行这些任务的线程可能最终会窃取完全驻留在其他地方的代码所生成的I / O密集型任务。

    这也是非常令人恐惧和难以捉摸的,因为您可能在整个代码库中一直使用 .parallel()却没有任何问题,而所需要的只是一个类,它在使用 .parallel()时引入了长期运行的任务,并且突然之间所有其他代码部分基础可能会获得不一致的性能。

    所以我的结论是:

    从理论上讲,如果可以保证将要在代码中任何位置创建的所有任务都是简短的,则 .parallel()很好
  • .parallel()可能对系统范围的性能没有明显的影响,除非您知道(例如,如果以后添加使用.parallel()并具有较长任务的单个代码,则可能会影响使用.parallel()的所有代码的性能)
  • 由于使用了2.,因此最好完全避免使用.parallel(),而最好使用ExecutorCompletionService或使用https://github.com/amaembo/streamex,后者允许您提供自己的ForkJoinPool(允许更多隔离)。更好的是,您可以使用https://github.com/palantir/streams/blob/1.9.1/src/main/java/com/palantir/common/streams/MoreStreams.java#L53,它使您可以更精细地控制并发机制。
  • 关于java - Java并行流的性能影响,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54580839/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com