gpt4 book ai didi

apache-spark - 为什么 Spark 为一个简单的聚合运行 5 个作业?

转载 作者:行者123 更新时间:2023-12-04 04:50:02 24 4
gpt4 key购买 nike

我在 IDE/eclipse 中以 local 模式使用 Spark。

我可以看到 Spark UI 为一个简单的聚合创建了许多作业。为什么?

import org.apache.spark.sql.SparkSession

trait SparkSessionWrapper {

lazy val spark: SparkSession = {
SparkSession
.builder()
.master("local[2]")
.appName("Spark Me")
.getOrCreate()
}

spark.sparkContext.setLogLevel("WARN")

}

Spark应用程序如下:
object RowNumberCalc
extends App
with SparkSessionWrapper {

import spark.implicits._

val cityDf = Seq(
("London", "Harish",5500,"2019-10-01"),
("NYC","RAJA",11121,"2019-10-01"),
("SFO","BABU",77000,"2019-10-01"),
("London","Rick",7500,"2019-09-01"),
("NYC","Jenna",6511,"2019-09-01"),
("SFO","Richard",234567,"2019-09-01"),
("London","Harish",999999,"2019-08-01"),
("NYC","Sam",1234,"2019-08-01"),
("SFO","Dylan",45678,"2019-08-01")).toDF("city","name","money","month_id")

cityDf.createOrReplaceTempView("city_table")
val totalMoneySql =
"""
|select city, sum(money) from city_table group by 1 """.stripMargin
spark.sql(totalMoneySql).show(false)


System.in.read
spark.stop()

}

如图简单计算每个城市的 Sum of Money
现在 SPARK-UI 显示 ==> 5 JOBS 每个 2 个阶段 _0x10456792
enter image description here

SQL 选项卡也显示 5 个作业。

enter image description here

但是 物理计划 显示正确的 阶段 划分
== Physical Plan ==
CollectLimit 21
+- *(2) LocalLimit 21
+- *(2) HashAggregate(keys=[city#9], functions=[sum(cast(money#11 as bigint))], output=[city#9, sum(money)#24])
+- Exchange hashpartitioning(city#9, 200)
+- *(1) HashAggregate(keys=[city#9], functions=[partial_sum(cast(money#11 as bigint))], output=[city#9, sum#29L])
+- LocalTableScan [city#9, money#11]

从哪里/如何 5 JOBS 被触发???

最佳答案

tl;博士 在默认的 200 个分区中,您需要处理很少的行(9 个作为主要输入和 3 个聚合),因此有 5 个 Spark 作业可以满足 Dataset.show 的要求。显示 20 行。

换句话说,你所经历的是Dataset.show -特定的(顺便说一下,这不适用于大型数据集,不是吗?)

默认 Dataset.show显示 20 行。它从 1 个分区开始,最多占用 20 行。如果没有足够的行,它会乘以 4(如果我没记错的话)并扫描其他 4 个分区以找到丢失的行。这一直有效,直到收集到 20 行。

最后的输出行数 HashAggregate是 3 行。

根据这 3 行在 Spark 中的分区,可以运行一个、两个或多个作业。它强烈依赖于行的散列(每 HashPartitioner)。

如果你真的想看到这个行数(9 个输入)的单个 Spark 作业,用 spark.sql.shuffle.partitions 启动 Spark 应用程序配置属性为 1 .

这将在聚合后使用 1 个分区进行计算,并将所有结果行放在一个分区中。

关于apache-spark - 为什么 Spark 为一个简单的聚合运行 5 个作业?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58565068/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com