gpt4 book ai didi

apache-spark - Apache Spark - shuffle 写入的数据多于输入数据的大小

转载 作者:行者123 更新时间:2023-12-04 04:36:58 25 4
gpt4 key购买 nike

我在本地模式下使用 Spark 2.1,我正在运行这个简单的应用程序。

val N = 10 << 20

sparkSession.conf.set("spark.sql.shuffle.partitions", "5")
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", (N + 1).toString)
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")

val df1 = sparkSession.range(N).selectExpr(s"id as k1")
val df2 = sparkSession.range(N / 5).selectExpr(s"id * 3 as k2")

df1.join(df2, col("k1") === col("k2")).count()

在这里,范围(N)创建了一个 Long 数据集(具有唯一值),所以我假设的大小

  • df1 = N * 8 bytes ~ 80MB
  • df2 = N / 5 * 8 bytes ~ 16MB


好的,现在让我们以 df1 为例。
df1 由 8 个分区组成 shuffledRDDs 为 5 ,所以我假设

  • # of mappers (M) = 8
  • # of reducers (R) = 5


由于分区数量较少,Spark 将使用 Hash Shuffle 将创建 M * R 文件 在磁盘中,但我不明白是否每个文件都有所有数据,因此 每个文件大小 = 数据大小 导致 M * R * 数据大小 文件或 all_files = data_size .

然而,当执行这个应用程序时,随机写入 df1 = 160MB 这与上述任何一种情况都不匹配。

Spark UI

我在这里缺少什么?为什么shuffle写入数据的大小翻了一番?

最佳答案

首先我们来看看data size total(min, med, max)方法:
根据 SQLMetrics.scala#L88ShuffleExchange.scala#L43 , data size total(min, med, max)我们看到的是dataSize的最终值shuffle 的度量。那么,它是如何更新的呢?每次序列化记录时它都会更新:UnsafeRowSerializer.scala#L66来自 dataSize.add(row.getSizeInBytes) ( UnsafeRow 是 Spark SQL 中记录的内部表示)。
内部,UnsafeRowbyte[] 支持,并在序列化期间直接复制到底层输出流,其 getSizeInBytes()方法只返回 byte[] 的长度.因此,最初的问题转化为:为什么字节表示是唯一 long 的两倍大一栏有记录吗?此 UnsafeRow.scala医生给了我们答案:

Each tuple has three parts: [null bit set] [values] [variable length portion]

The bit set is used for null tracking and is aligned to 8-byte word boundaries. It stores one bit per field.


因为它是 8 字节字对齐的,所以唯一的 1 个空位占用了另一个 8 字节,与长列的宽度相同。因此,每个 UnsafeRow使用 16 个字节表示您的一长列行。

关于apache-spark - Apache Spark - shuffle 写入的数据多于输入数据的大小,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44058881/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com