gpt4 book ai didi

scala - Spark- 写入 128 MB 大小的 Parquet 文件

转载 作者:行者123 更新时间:2023-12-04 07:42:10 25 4
gpt4 key购买 nike

我有一个超过 10 亿行的 DataFrame (df)

df.coalesce(5)
.write
.partitionBy("Country", "Date")
.mode("append")
.parquet(datalake_output_path)

从上面的命令我了解到我的 100 个工作节点集群 (spark 2.4.5) 中只有 5 个工作节点将执行所有任务。使用 coalesce(5) 需要 7 个小时才能完成。

我应该尝试 repartition 而不是 coalesce 吗?

是否有更快/更有效的方法来写出 128 MB 大小的 Parquet 文件,或者我是否需要先计算数据帧的大小以确定需要多少个分区。

例如,如果我的数据帧大小为 1 GB 并且 spark.sql.files.maxPartitionBytes = 128MB,我应该首先计算 否。所需的分区数为 1 GB/128 MB = 大约(8) 然后执行 repartition(8) 或 coalesce(8)?

想法是在编写时最大化输出中 parquet 文件的大小,并且能够快速(更快)完成此操作。

最佳答案

您可以获取数据帧 df 的大小 (dfSizeDiskMB),方法是持久化数据帧,然后检查 Web UI 上的“存储”选项卡,如 answer 所示。 .有了这些信息和对预期 Parquet 压缩率的估计,您就可以估计达到所需输出文件分区大小所需的分区数,例如

val targetOutputPartitionSizeMB = 128
val parquetCompressionRation = 0.1
val numOutputPartitions = dfSizeDiskMB * parquetCompressionRatio / targetOutputPartitionSizeMB
df.coalesce(numOutputPartitions).write.parquet(path)

请注意 spark.files.maxPartitionBytes在这里不相关,因为它是:

The maximum number of bytes to pack into a single partition when reading files.

(除非 df 是读取输入数据源的直接结果,没有创建中间数据帧。更有可能的是 df 的分区数由 决定spark.sql.shuffle.partitions,是 Spark 用于从连接和聚合创建的数据帧的分区数。

Should I try repartition instead of coalesce?

coalesce 通常更好,因为它可以避免与 repartition 相关的洗牌,但请注意 docs 中的警告根据您的用例,可能会在上游阶段失去并行性。

关于scala - Spark- 写入 128 MB 大小的 Parquet 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67404137/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com