gpt4 book ai didi

apache-spark - 如何知道(广播)联接查询中的Spark作业和阶段数?

转载 作者:行者123 更新时间:2023-12-03 15:26:40 25 4
gpt4 key购买 nike

我使用Spark 2.1.2。

我试图了解各种Spark UI选项卡在作业运行时的显示。我使用spark-shell --master local并执行以下join查询:

val df = Seq(
(55, "Canada", -1, "", 0),
(77, "Ontario", 55, "/55", 1),
(100, "Toronto", 77, "/55/77", 2),
(104, "Brampton", 100, "/55/77/100", 3)
).toDF("id", "name", "parentId", "path", "depth")

val dfWithPar = df.as("df1").
join(df.as("df2"), $"df1.parentId" === $"df2.Id", "leftouter").
select($"df1.*", $"df2.name" as "parentName")

dfWithPar.show

这是物理查询计划:
== Physical Plan ==
*Project [Id#11, name#12, parentId#13, path#14, depth#15, name#25 AS parentName#63]
+- *BroadcastHashJoin [parentId#13], [Id#24], LeftOuter, BuildRight
:- LocalTableScan [Id#11, name#12, parentId#13, path#14, depth#15]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
+- LocalTableScan [Id#24, name#25]

我有两个关于查询执行的问题。
  • 为什么要查询两个作业?

    Spark job view
  • 为什么两个作业的工作台 View 都相同?下面是作业ID 1的阶段 View 的屏幕截图,它与作业ID 0完全相同。为什么?

    Stage view of Stage 1 which is exactly same as Stage 0
  • 最佳答案

    我使用 Spark 2.3.0 来回答您的问题(实际上是 2.3.1-SNAPSHOT ),因为它在撰写本文时是最新的也是最大的。关于查询执行(如果有任何重要内容)的更改几乎没有什么变化,因为您的2.1.2和我的2.3.0中的物理查询计划完全相同(括号中的per-query codegen stage ID除外)。

    dfWithPar.show之后,结构化查询(使用Spark SQL的Scala数据集API构建的查询)已针对以下物理查询计划进行了优化(为了更好的理解,我将其包含在答案中)。

    scala> dfWithPar.explain
    == Physical Plan ==
    *(1) Project [Id#11, name#12, parentId#13, path#14, depth#15, name#24 AS parentName#58]
    +- *(1) BroadcastHashJoin [parentId#13], [Id#23], LeftOuter, BuildRight
    :- LocalTableScan [Id#11, name#12, parentId#13, path#14, depth#15]
    +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
    +- LocalTableScan [Id#23, name#24]

    Spark 作业数

    Why are there two jobs for the query?



    我想说甚至还有三个Spark作业。

    Spark jobs of the broadcast join query in web UI

    tl; dr 一个Spark作业用于 BroadcastHashJoinExec物理运算符,而其他两个作业用于 Dataset.show

    为了了解查询执行和结构化查询的Spark作业数量,了解结构化查询(使用数据集API描述)和RDD API之间的区别非常重要。

    Spark SQL的数据集和Spark Core的RDD都描述了Spark中的分布式计算。 RDD是Spark的“汇编程序”语言(类似于JVM字节码),而数据集是使用类似SQL的语言(与我之前使用的JVM字节码相比类似于Scala或Java的JVM语言)对结构化查询的更高级描述。

    重要的是,使用数据集API的结构化查询最终最终将成为基于RDD的分布式计算(可以与Java或Scala编译器将高级语言转换为JVM字节码的方式进行比较)。

    数据集API是RDD API的抽象,当您在DataFrame或Dataset上调用操作时,该操作会将它们转换为RDD。

    这样,您不应该对 Dataset.show最终调用RDD操作而感到惊讶,该操作将依次运行零个,一个或多个Spark作业。

    最后, Dataset.show(默认 numRows等于20)调用 showString,即 take(numRows + 1)以获得 Array[Row]
    val takeResult = newDf.select(castCols: _*).take(numRows + 1)

    换句话说, dfWithPar.show()等效于 dfWithPar.take(21),就涉及Spark作业的数量而言,其又等效于 dfWithPar.head(21)

    您可以在“SQL”选项卡中查看它们及其作业数。他们都应该平等。

    SQL tab in web UI
    showtakehead都导致 collectFromPlan触发Spark作业(通过调用 executeCollect)。

    您应该确定要回答有关作业数量的问题是要知道查询中所有物理运算符的工作方式。您只需要知道它们在运行时的行为以及它们是否完全触发Spark作业即可。

    enter image description here

    BroadcastHashJoin和BroadcastExchangeExec物理运算符

    可以广播联接的右侧时,将使用 BroadcastHashJoinExec二进制物理运算符(其确切是 spark.sql.autoBroadcastJoinThreshold,默认情况下为 10M)。
    BroadcastExchangeExec一元物理运算符用于将行(具有某种关系)的行广播到辅助节点(以支持 BroadcastHashJoinExec)。

    当执行 BroadcastHashJoinExec(生成 RDD[InternalRow])时, creates a broadcast variable依次执行 BroadcastExchangeExec(在 separate thread上)。

    这就是运行ThreadPoolExecutor.java:1149 Spark作业0的 的原因。

    如果执行以下操作,则可以看到运行了单个Spark作业0:
    // Just a single Spark job for the broadcast variable
    val r = dfWithPar.rdd

    这就要求执行结构化查询以生成RDD,然后将该RDD作为要提供最终结果的操作的目标。

    enter image description here

    如果您没有以广播联接查询结束,那么您将没有Spark作业。

    RDD.take运算符

    我在回答问题的第一刻就错过了数据集运算符,即showtakehead最终会导致RDD.take

    take(num: Int): Array[T] Take the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.



    请注意,当take说“它首先扫描一个分区,然后使用该分区的结果来估计满足限制所需的其他分区的数量时,即可工作”。这是了解广播联接查询中Spark作业数量的关键。

    每次迭代(在上面的描述中)都是separate Spark job starting with the very first partition and 4 times as many every following iteration:
    // RDD.take
    def take(num: Int): Array[T] = withScope {
    ...
    while (buf.size < num && partsScanned < totalParts) {
    ...
    val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
    ...
    }
    }

    看看下面的21行RDD.take
    // The other two Spark jobs
    r.take(21)

    与查询中一样,您将获得2个Spark作业。

    enter image description here

    猜猜如果执行dfWithPar.show(1),将有多少个Spark作业。

    为什么阶段是相同的?

    Why are the stage view shown for both jobs identical? Below is a screenshot of the stage view of job id 1 which is exactly the same of job id 0. Why?



    这很容易回答,因为两个Spark作业都来自RDD.take(20)

    第一个Spark作业是扫描第一个分区,并且由于它没有足够的行而导致另一个Spark作业来扫描更多的分区。

    关于apache-spark - 如何知道(广播)联接查询中的Spark作业和阶段数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49385724/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com