gpt4 book ai didi

apache-spark - 详细说明为什么 shuffle 写入数据比 apache spark 中的输入数据要多

转载 作者:行者123 更新时间:2023-12-04 12:46:29 26 4
gpt4 key购买 nike

enter image description here

  • 任何人都可以向我详细说明在 Spark UI 中输入、输出、随机读取和随机写入究竟指定了什么?
  • 另外,有人可以解释一下这项工作的输入是如何占随机写入的 25~30% 的吗?
    根据我的理解,shuffle write 是无法保存在内存中的临时数据和在聚合或减少期间需要发送给其他执行程序的数据的总和。

  • 代码如下:

    hiveContext.sql("SELECT * FROM TABLE_NAME WHERE PARTITION_KEY = 'PARTITION_VALUE'")
    .rdd
    .map{case (row:Row)
    =>((row.getString(0), row.getString(12)),
    (row.getTimestamp(11), row.getTimestamp(11),
    row))}
    .filter{case((client, hash),(d1,d2,obj)) => (d1 !=null && d2 !=null)}
    .reduceByKey{
    case(x, y)=>
    if(x._1.before(y._1)){
    if(x._2.after(y._2))
    (x)
    else
    (x._1, y._2, y._3)
    }else{
    if(x._2.after(y._2))
    (y._1, x._2, x._3)
    else
    (y)
    }
    }.count()


    其中 ReadDailyFileDataObject 是一个 case Class,它将行字段作为容器保存。
    需要容器,因为有 30 列,超过了 22 的元组限制。

    更新代码,删除案例类,因为我看到同样的问题,当我使用 Row 本身而不是案例类时。

    现在我看到

    Task : 10/7772

    Input : 2.1 GB

    Shuffle Write : 14.6 GB



    如果有帮助,我正在尝试处理存储为 Parquet 文件的表,其中包含 210 亿行。

    以下是我正在使用的参数,
    "spark.yarn.am.memory" -> "10G"
    "spark.yarn.am.cores" -> "5"
    "spark.driver.cores" -> "5"
    "spark.executor.cores" -> "10"
    "spark.dynamicAllocation.enabled" -> "true"
    "spark.yarn.containerLauncherMaxThreads" -> "120"
    "spark.executor.memory" -> "30g"
    "spark.driver.memory" -> "10g"
    "spark.driver.maxResultSize" -> "9g"
    "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer"
    "spark.kryoserializer.buffer" -> "10m"
    "spark.kryoserializer.buffer.max" -> "2001m"
    "spark.akka.frameSize" -> "2020"

    SparkContext 注册为
    new SparkContext("yarn-client", SPARK_SCALA_APP_NAME, sparkConf)

    在 yarn 上,我看到

    Allocated CPU VCores : 95

    Allocated Memory : 309 GB

    Running Containers : 10

    最佳答案

    将鼠标悬停在 Input 上时显示的提示Output Shuffle Read Shuffle Write很好地解释自己:

    INPUT: Bytes and records read from Hadoop or from Spark storage.

    OUTPUT: Bytes and records written to Hadoop.

    SHUFFLE_WRITE: Bytes and records written to disk in order to be read by a shuffle in a future stage.

    Shuffle_READ: Total shuffle bytes and records read (includes both data read locally and data read from remote executors).


    在您的情况下,150.1GB 占所有 1409 个已完成任务的输入大小(即到目前为止从 HDFS 读取的总大小),874GB 占所有 1409 个已完成任务在节点本地磁盘上的写入。
    您可以引用 What is the purpose of shuffling and sorting phase in the reducer in Map Reduce Programming?很好地理解整体洗牌功能。

    关于apache-spark - 详细说明为什么 shuffle 写入数据比 apache spark 中的输入数据要多,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36281997/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com