gpt4 book ai didi

scala - 作为执行程序和线程数量的函数,spark中的分区数量是多少?

转载 作者:行者123 更新时间:2023-12-01 10:21:07 26 4
gpt4 key购买 nike

我在EMR上使用Spark。我启动一个集群,有时集群很小(在编写/测试代码时),例如5-10个实例。其他时间则使用大量实例(例如30-50)执行同一代码。

我知道我可以访问配置以帮助设置分区数,选择合适的分区数有助于运行时。

我想根据执行程序的数量和线程的数量来参数化分区的数量:

val instanceCount = sc.getConf.get("spark.executor.instances").toDouble
val coreCount = sc.getConf.get("spark.executor.cores").toDouble

有没有人研究过这个问题,并且可以就分区数量的参数化提供任何建议?

我意识到,没有一个好的答案,而是某些函数形式,常量将有所帮助。例如:
val partitionCount = instanceCount*coreCount*0.7 

在我的用例中似乎工作得很好,并描述您的用例(执行者的数量/范围),这将有所帮助。

在答案中,如果您可以指出实例的范围,那么您的工作也会很有帮助。如果在某处对此进行了规范的调查,则指向该点的指针将很有帮助。

最佳答案

没有针对所有用例的最佳配置,但是我将向您提供我在Spark体验中收集的所有启发式方法。

分区多于核心

首先,让我们说明一下显而易见的内容。您需要的分区(给定阶段中的任务)比内核更多,否则某些内核将无所事事。此经验法则是否有例外?是:

  • 您也可以并行运行多个作业。假设您有1000个小型数据集,则需要独立于其他数据集对它们进行一些转换。您可能不想将每个数据集划分为128k文件,但是您可以并行运行128个分区的多个作业,以最大程度地利用核心数量。请注意,我只知道如何通过设置spark.scheduler.mode=FAIR在一个步骤内或在自定义托管的YARN集群上执行此操作。我从来没有尝试过提交并行的EMR步骤,也不知道是否有可能,这不是常规的YARN概念(但同样,如果需要,您可以在同一步骤中完成)。
  • 您的任务本身是并行的。绝对不是常规用例,我一般不建议这样做,但是我不得不在Spark上并行化一些MXNet分类代码。 Java代码创建一个使用MXNet进行预测的Python进程,然后将结果返回给Java。由于MXNet在内部是并行的,并且非常善于使用内核,因此我发现通过拥有尽可能多的计算机(因此采用尽可能小的实例)并且每台计算机只有两个执行程序(容器),吞吐量会更高。每个执行者都创建一个用于服务4个Spark任务(分区)的MXNet进程,这足以最大化我的CPU使用率。在不限制MXNet进程数量的情况下,CPU始终固定为100%,并且浪费了上下文切换中的大量时间。

  • 每个分区有大量数据

    小分区会导致作业变慢,因为驱动程序和从属之间存在一定量的通信,如果您有10万个小任务,这确实会占用大量时间。如果您的任务在1秒内完成,则分区肯定太小。

    相反,大分区会损害内存,尤其是在随机播放期间。随机播放对内存的要求很高,将使您进行大量垃圾回收。如果分区太大,则会增加内存用尽的风险,或者最多花费50%以上的时间在GC中。序列化形式的2GB是分区大小的绝对限制,因为Spark使用Java Java IO实用程序,该实用程序由字节数组支持,该字节数组只能容纳 2^31 - 1( int的大小)元素。

    通常,如果您要随机播放(建议在此处主要讨论JSON和文本数据),则建议以压缩格式保存大约25MB-70MB。

    广播节目

    如果您需要向所有执行者广播一些对象(例如,布隆过滤器,用于在改组数据集之前减小其大小),则所需的容器数将由您愿意使用的内存量决定。每台机器都可以保存您的广播。实际上,该对象将对每个执行器广播一次,因此,假设同构集群,每台机器的广播数据量为 object_size * num_executors / num_ec2_instances。网络成本也随着容器数量的增加而增加,因为对象需要多次广播到每个EC2实例。

    但是,我遇到的情况是,我的广播对象是一个逻辑模型,在分类过程中使用了一些内部可变状态。这意味着predict方法已同步,并且容器中的所有线程都在争夺访问此锁的权限。通过增加容器的数量(从而增加广播的内存和网络成本),我使这项工作快了4倍。

    摘要

    分区的数量由数据的大小决定,而不是由可用内核的数量决定。如果您的数据不需要超过200个分区,那么就不要采用大于200个核心的集群,如果分区的大小已经足够合理,那么增加分区和核心的数量可能不会有任何有意义的速度。

    只要您的数据大小合适且分区平衡,剩下的唯一试探法是:
  • 使用的内核至少与分区
  • 一样多
    如果想增加吞吐量,但
  • 可以并行运行多个作业(如果可以的话),但是您的分区已经是适当大小的
  • 避免在您的任务中运行多线程代码,但是在极少数情况下,您需要考虑使用比内核少的分区,而不是核心。
  • 拥有的容器越多,广播的成本就越高(广播期间的网络活动更多,并且直到销毁为止的内存使用更多)。如果广播的对象是完全不变的,请尝试使用尽可能少的容器。如果您的容器具有某种内部状态并需要锁定,则每个容器中的线程过多可能会增加争用并降低速度。
  • 关于scala - 作为执行程序和线程数量的函数,spark中的分区数量是多少?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52412679/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com