gpt4 book ai didi

apache-spark - reduce的聚合和归约使用了哪些节点?

转载 作者:行者123 更新时间:2023-12-04 05:01:20 24 4
gpt4 key购买 nike

我很好奇 Spark 中的 reduce(f: (T, T) => T): T 函数是如何工作的。

当我让 X 个节点处理数据,然后通过使用 reduce 聚合结果数据时,聚合究竟是如何工作的?聚合和归约涉及多少个节点?

节点是指 Spark 应用程序的驱动程序和执行程序。

最佳答案

深入查看代码,您会发现以下注释:

 /**
* Reduces the elements of this RDD using the specified commutative and
* associative binary operator.
*/
def reduce(f: (T, T) => T): T

对函数的限制对于确保 reduce 正常工作很重要。至于“它是如何工作的”,reduce 将在本地应用于每个分区,其结果将一起减少以获得最终结果。

因此,持有数据的 X 个节点都将并行执行 reduce 操作,其结果将在驱动程序节点上聚合在一起。假设函数是可交换的和结合的将确保稳定的结果(即,独立于函数如何应用于数据的顺序)

一个简化的例子:

给定一个执行器上有 3 个分区的 rdd:

rdd = 

p1 - [1, 3, 5, 7, 9, 11]
p2 - [2, 3, 5, 7, 11, 13]
p3 - [1, 1, 2, 3, 5, 8]

然后 rdd.reduce(_ + _):

+-------+---------- stage1-------------------+to driver+------------------------+
| EX1 | [1, 3, 5, 7, 9, 11 ].reduce(_ + _) | => 36 | |
| EX2 | [2, 3, 5, 7, 11, 13].reduce(_ + _) | => 41 | |
| EX3 | [1, 1, 2, 3, 5, 8 ].reduce(_ + _) | => 20 | |
| Driver| | | (36,41,20).reduce(_+_) |
+-------+------------------------------------+---------+------------------------+

在执行这样一个简单的 Action 后,在 Spark 日志中可以观察到同样的情况:

val rdd = sc.parallelize(1 to 15,3)
rdd.reduce(_ + _)

TaskSchedulerImpl: Adding task set 1.0 with 3 tasks
TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 1220 bytes)
TaskSetManager: Starting task 1.0 in stage 1.0 (TID 4, localhost, PROCESS_LOCAL, 1220 bytes)
TaskSetManager: Starting task 2.0 in stage 1.0 (TID 5, localhost, PROCESS_LOCAL, 1220 bytes)
Executor: Running task 1.0 in stage 1.0 (TID 4)
Executor: Running task 0.0 in stage 1.0 (TID 3)
Executor: Running task 2.0 in stage 1.0 (TID 5)
Executor: Finished task 1.0 in stage 1.0 (TID 4). 727 bytes result sent to driver
Executor: Finished task 0.0 in stage 1.0 (TID 3). 727 bytes result sent to driver
Executor: Finished task 2.0 in stage 1.0 (TID 5). 727 bytes result sent to driver
TaskSetManager: Finished task 1.0 in stage 1.0 (TID 4) in 4 ms on localhost (1/3)
TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 6 ms on localhost (2/3)
TaskSetManager: Finished task 2.0 in stage 1.0 (TID 5) in 6 ms on localhost (3/3)
DAGScheduler: Stage 1 (reduce at <console>:14) finished in 0.007 s
TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
DAGScheduler: Job 1 finished: reduce at <console>:14, took 0.014196 s

关于apache-spark - reduce的聚合和归约使用了哪些节点?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28942376/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com