gpt4 book ai didi

scala - Spark UI DAG 阶段已断开连接

转载 作者:行者123 更新时间:2023-12-02 16:09:22 25 4
gpt4 key购买 nike

我在spark-shell中运行了以下工作:

val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i)).persist
d.join(d.reduceByKey(_ + _)).collect

Spark UI 显示三个阶段。第 4 阶段和第 5 阶段对应于 d 的计算,第 6 阶段对应于 collect 操作的计算。由于 d 是持久的,我预计只有两个阶段。然而,第 5 阶段并未与任何其他阶段连接。

Spark UI DAG

因此尝试在不使用持久化的情况下运行相同的计算,并且 DAG 看起来完全相同,除了没有指示 RDD 已持久化的绿点。

Spark UI DAG without persist

我希望第 11 级的输出连接到第 12 级的输入,但事实并非如此。

查看阶段描述,这些阶段似乎表明 d 正在被持久化,因为阶段 5 有输入,但我仍然对为什么阶段 5 存在感到困惑。

Spark UI stages

Spark UI stages without persist

最佳答案

  1. 输入RDD被缓存,并且缓存的部分不会重新计算。

    这可以通过简单的测试来验证:

    import org.apache.spark.SparkContext

    def f(sc: SparkContext) = {
    val counter = sc.longAccumulator("counter")
    val rdd = sc.parallelize(0 until 100).map(i => {
    counter.add(1L)
    (i%10, i)
    }).persist
    rdd.join(rdd.reduceByKey(_ + _)).foreach(_ => ())
    counter.value
    }

    assert(f(spark.sparkContext) == 100)
  2. 缓存不会从 DAG 中删除阶段。

    如果数据被缓存对应阶段can be marked as skipped但仍然是 DAG 的一部分。可以使用检查点截断谱系,但这不是同一件事,并且它不会从可视化中删除阶段。

  3. 输入阶段包含的不仅仅是缓存的计算。

    Spark 阶段将操作组合在一起,这些操作可以链接起来而无需执行随机播放。

    虽然输入阶段的一部分被缓存,但它并没有涵盖准备随机文件所需的所有操作。这就是您看不到跳过的任务的原因。

  4. 其余部分(分离)只是图形可视化的限制。

  5. 如果您首先对数据进行重新分区:

    import org.apache.spark.HashPartitioner

    val d = sc.parallelize(0 until 1000000)
    .map(i => (i%100000, i))
    .partitionBy(new HashPartitioner(20))

    d.join(d.reduceByKey(_ + _)).collect

    您将获得您最有可能寻找的 DAG:

    enter image description here

关于scala - Spark UI DAG 阶段已断开连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40636554/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com