gpt4 book ai didi

apache-spark - Spark 中的阶段如何划分为任务?

转载 作者:行者123 更新时间:2023-12-03 04:32:18 25 4
gpt4 key购买 nike

我们假设以下情况每个时间点只有一个 Spark 作业正在运行。

到目前为止我得到了什么

以下是我对 Spark 中发生的情况的理解:

  1. 当创建 SparkContext 时,每个工作节点都会启动一个执行程序。执行器是独立的进程(JVM),连接回驱动程序。每个执行器都有驱动程序的jar。退出驱动程序,关闭执行程序。每个执行器可以容纳一些分区。
  2. 执行作业时,会根据谱系图创建执行计划。
  3. 执行作业分为多个阶段,其中阶段包含尽可能多的相邻(沿袭图中)转换和操作,但没有随机播放。因此阶段是通过洗牌来分隔的。

image 1

我明白

  • A task is a command sent from the driver to an executor by serializing the Function object.
  • The executor deserializes (with the driver jar) the command (task) and executes it on a partition.

但是

问题

如何将阶段划分为这些任务?

具体:

  1. 任务是由转换和操作决定的,还是一个任务中可以有多个转换/操作?
  2. 任务是否由分区确定(例如,每个分区每个阶段一个任务)。
  3. 任务是否由节点决定(例如,每个节点每个阶段一个任务)?

我的想法(仅部分回答,即使是正确的)

https://0x0fff.com/spark-architecture-shuffle ,用图片解释随机播放

enter image description here

我的印象是规则是

each stage is split into #number-of-partitions tasks, with no regard for the number of nodes

对于我的第一张图像,我会说我有 3 个映射任务和 3 个归约任务。

对于来自 0x0fff 的图像,我认为有 8 个映射任务和 3 个归约任务(假设只有 3 个橙色文件和 3 个深绿色文件)。

无论如何都存在开放性问题

这是正确的吗?但即使这是正确的,我上面的问题也没有全部得到解答,因为它仍然是开放的,无论多个操作(例如多个 map )是在一个任务中还是每个操作分成一个任务。

别人怎么说

What is a task in Spark? How does the Spark worker execute the jar file?How does the Apache Spark scheduler split files into tasks?类似,但我觉得我的问题没有得到明确的回答。

最佳答案

你这里有一个非常好的轮廓。回答您的问题

  • 需要为每个阶段的每个数据分区启动单独的任务。考虑到每个分区可能驻留在不同的物理位置 - 例如HDFS 中的 block 或本地文件系统的目录/卷。

请注意,Stage 的提交是由 DAG Scheduler 驱动的。这意味着不相互依赖的阶段可以提交到集群并行执行:这最大化了集群上的并行化能力。因此,如果数据流中的操作可以同时发生,我们将期望看到多个阶段启动。

我们可以在下面的玩具示例中看到它的实际效果,其中我们执行以下类型的操作:

  • 加载两个数据源
  • 分别对两个数据源执行一些映射操作
  • 加入他们
  • 对结果执行一些映射和过滤操作
  • 保存结果

那么我们最终会经历多少阶段?

  • 并行加载两个数据源各 1 个阶段 = 2 个阶段
  • 代表连接的第三阶段依赖其他两个阶段
  • 注意:对连接数据进行的所有后续操作都可以在同一阶段执行,因为它们必须按顺序发生。启动额外的阶段没有任何好处,因为在之前的操作完成之前它们无法开始工作。

这是那个玩具程序

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

这是结果的 DAG

enter image description here

现在:有多少任务?任务数量应等于

(阶段 * 阶段中的#Partitions)的总和

关于apache-spark - Spark 中的阶段如何划分为任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37528047/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com