gpt4 book ai didi

apache-spark - 使用 Spark 的 partitionBy 方法对 S3 中的大型倾斜数据集进行分区

转载 作者:行者123 更新时间:2023-12-04 14:54:05 24 4
gpt4 key购买 nike

我正在尝试使用 Spark 和 partitionBy 将一个大的分区数据集写入磁盘。算法正在与我尝试过的两种方法作斗争。

分区严重倾斜 - 一些分区很大,而其他分区很小。

问题 #1 :

当我之前使用重新分区时 repartitionBy , Spark 将所有分区写成一个文件,即使是大分区



val df = spark.read.parquet("some_data_lake")
df
.repartition('some_col).write.partitionBy("some_col")
.parquet("partitioned_lake")

这需要永远执行,因为 Spark 没有并行写入大分区。如果其中一个分区有 1TB 的数据,Spark 会尝试将整个 1TB 的数据写入单个文件。

问题#2 :

当我不使用时 repartition , Spark 写出太多文件。

这段代码将写出数量惊人的文件。

df.write.partitionBy("some_col").parquet("partitioned_lake")

我在一个很小的 ​​8 GB 数据子集上运行它,Spark 写出了 85,000 多个文件!

当我尝试在生产数据集上运行它时,一个包含 1.3 GB 数据的分区被写出为 3,100 个文件。

我想要什么

我希望每个分区都被写成 1 GB 的文件。因此,具有 7 GB 数据的分区将作为 7 个文件写出,而具有 0.3 GB 数据的分区将作为单个文件写出。

我最好的前进道路是什么?

最佳答案

最简单的解决方案是在 repartition 中添加一列或多列并明确设置分区数。

val numPartitions = ???

df.repartition(numPartitions, $"some_col", $"some_other_col")
.write.partitionBy("some_col")
.parquet("partitioned_lake")

在哪里:
  • numPartitions - 应该是写入分区目录的所需文件数量的上限(实际数量可以更低)。
  • $"some_other_col" (和可选的附加列)应该具有高基数并且独立于 $"some_column (这两者之间应该存在函数依赖关系,并且不应该高度相关)。

    如果数据不包含此类列,您可以使用 o.a.s.sql.functions.rand .

    import org.apache.spark.sql.functions.rand

    df.repartition(numPartitions, $"some_col", rand)
    .write.partitionBy("some_col")
    .parquet("partitioned_lake")
  • 关于apache-spark - 使用 Spark 的 partitionBy 方法对 S3 中的大型倾斜数据集进行分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53037124/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com