gpt4 book ai didi

scala - 为什么 Spark DataFrame 创建了错误数量的分区?

转载 作者:行者123 更新时间:2023-12-04 17:53:52 24 4
gpt4 key购买 nike

我有一个包含 2 列的 spark 数据框 - col1col2 .

scala> val df = List((1, "a")).toDF("col1", "col2")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: string]

当我写 dfparquet 中的磁盘上格式,将所有数据写入文件数量等于 col1 中唯一值的数量我做了一个 repartition使用 col1 ,像这样:

scala> df.repartition(col("col1")).write.partitionBy("col1").parquet("file")

以上代码在文件系统中只生成一个文件。但是,shuffle 操作的数量变为 200。

enter image description here

我在这里无法理解一件事,如果 col1仅包含一个值,即 1那么为什么它在 repartition 中创建 200 个分区?

最佳答案

repartition(columnName) 默认创建 200 个分区(更具体地说,spark.sql.shuffle.partitions 分区),无论 的唯一值有多少col1 有。如果 col1 只有 1 个唯一值,则 199 个分区为空。另一方面,如果 col1 的唯一值超过 200 个,则每个分区将有多个 col1 值。

如果您只想要 1 个分区,那么您可以执行 repartition(1,col("col1")) 或仅执行 coalesce(1)。但并不是说 coalesce 的行为与 coalesce 我在你的代码中进一步向上移动可能会失去并行性(参见 How to prevent Spark optimization)

如果你想查看你分区的内容,我为此做了2种方法:

// calculates record count per partition
def inspectPartitions(df: DataFrame) = {
import df.sqlContext.implicits._
df.rdd.mapPartitions(partIt => {
Iterator(partIt.toSeq.size)
}
).toDF("record_count")
}

// inspects how a given key is distributed accross the partition of a dataframe
def inspectPartitions(df: DataFrame, key: String) = {
import df.sqlContext.implicits._
df.rdd.mapPartitions(partIt => {
val part = partIt.toSet
val partSize = part.size
val partKeys = part.map(r => r.getAs[Any](key).toString.trim)
val partKeyStr = partKeys.mkString(", ")
val partKeyCount = partKeys.size
Iterator((partKeys.toArray,partSize))
}
).toDF("partitions","record_count")
}

现在您可以像这样检查你的数据框:

inspectPartitions(df.repartition(col("col1"),"col1")
.where($"record_count">0)
.show

关于scala - 为什么 Spark DataFrame 创建了错误数量的分区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44878294/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com