gpt4 book ai didi

scala - 如何优化 Spark 以将大量数据写入 S3

转载 作者:行者123 更新时间:2023-12-04 11:30:58 26 4
gpt4 key购买 nike

我在 EMR 上使用 Apache Spark 进行了大量 ETL。

我对获得良好性能所需的大部分调整都相当满意,但我有一项似乎无法弄清楚的工作。

基本上,我正在获取大约 1 TB 的 Parquet 数据 - 分布在 S3 中的数万个文件中 - 并添加几列并将其写出按数据的日期属性之一分区 - 同样,在 S3 中格式化的 Parquet 。

我是这样跑的:

spark-submit --conf spark.dynamicAllocation.enabled=true  --num-executors 1149 --conf spark.driver.memoryOverhead=5120 --conf  spark.executor.memoryOverhead=5120 --conf  spark.driver.maxResultSize=2g --conf spark.sql.shuffle.partitions=1600 --conf spark.default.parallelism=1600 --executor-memory 19G --driver-memory 19G --executor-cores 3 --driver-cores 3 --class com.my.class path.to.jar <program args>

簇的大小是根据输入数据集的大小动态确定的,num-executors、spark.sql.shuffle.partitions、spark.default.parallelism参数是根据簇的大小计算的。

代码大致是这样的:
va df = (read from s3 and add a few columns like timestamp and source file name)

val dfPartitioned = df.coalesce(numPartitions)

val sqlDFProdDedup = spark.sql(s""" (query to dedup against prod data """);

sqlDFProdDedup.repartition($"partition_column")
.write.partitionBy("partition_column")
.mode(SaveMode.Append).parquet(outputPath)

当我查看神经节图表时,我在重复数据删除逻辑运行和一些数据混洗时获得了巨大的资源峰值,但随后实际写入数据仅使用一小部分资源并运行数小时。

我不认为主要问题是分区倾斜,因为数据应该公平地分布在所有分区中。

分区列本质上是一个月中的一天,因此每个作业通常只有 5-20 个分区,具体取决于输入数据集的跨度。每个分区通常在 10-20 个 Parquet 文件中包含大约 100 GB 的数据。

我正在设置 spark.sql.files.maxRecordsPerFile 来管理这些输出文件的大小。

所以,我的大问题是:我怎样才能提高这里的性能?

简单地添加资源似乎没有多大帮助。

我曾尝试使执行程序更大(以减少混洗)并增加每个执行程序的 CPU 数量,但这似乎无关紧要。

提前致谢!

最佳答案

Zack,我有一个类似的用例,每天要处理 'n' 倍的文件。我将假设您按原样使用上面的代码并试图提高整体工作的性能。以下是我的一些观察:

  • 不知道是什么coalesce(numPartitions)数字实际上是以及为什么在重复数据删除过程之前使用它。您的 spark-submit 显示您正在创建 1600 个分区,这足以开始。
  • 如果您要在写入之前重新分区,那么上面的合并可能根本没有好处,因为重新分区会打乱数据。
  • 由于您声称要编写 10-20 个 Parquet 文件,这意味着您在工作的最后一部分只使用了 10-20 个内核,这是其运行缓慢的主要原因。基于 100 GB 的估计, Parquet 文件的范围从大约 5GB 到 10 GB,这真的很大,我怀疑人们是否能够在本地笔记本电脑或 EC2 机器上打开它们,除非他们使用 EMR 或类似的(如果阅读,则具有巨大的执行程序内存)整个文件或溢出到磁盘),因为内存要求太高。我会建议创建大约 1GB 的 Parquet 文件以避免任何这些问题。

  • 此外,如果您创建 1GB Parquet 文件,您可能会加快进程 5 到 10 倍,因为您将使用更多的执行程序/内核来并行编写它们。您实际上可以通过简单地使用默认分区编写数据帧来运行实验。

    这让我明白你真的不需要像你想写的那样使用重新分区。partitionBy("partition_date") 调用。您的 repartition() call 实际上强制数据帧最多只有 30-31 个分区,具体取决于当月的天数,这是驱动写入文件数量的原因。 write.partitionBy("partition_date")实际上是在 S3 分区中写入数据,如果您的数据帧有 90 个分区,它的写入速度将提高 3 倍(3 * 30)。 df.repartition()迫使它放慢速度。您真的需要 5GB 或更大的文件吗?
  • 另一个重点是 Spark 惰性求值有时太聪明了。在您的情况下,它很可能仅使用基于 repartition(number) 的整个程序的执行程序数量。 .相反,您应该尝试,df.cache() -> df.count() and then df.write() .它的作用是强制 spark 使用所有可用的执行程序核心。我假设您正在并行读取文件。在您当前的实现中,您可能使用 20-30 个内核。有一点需要注意,当您使用 r4/r5 机器时,请随时将您的 executor 内存增加到 48G 和 8 核。我发现 8 核对我的任务来说更快,而不是标准的 5 核推荐。
  • 另一个指针是尝试 ParallelGC 而不是 G1GC。对于这样的用例,当您读取 1000 倍的文件时,我注意到它的性能比 G1Gc 好或不差。请试一试。

  • 在我的工作量中,我使用 coalesce(n)基于方法,其中 'n' 给我一个 1GB 的 Parquet 文件。我使用集群上可用的所有内核并行读取文件。只有在写入部分,我的内核才空闲,但您无法避免这种情况。

    我不知道如何 spark.sql.files.maxRecordsPerFilecoalesce() or repartition() 一起使用但我发现 1GB 似乎可以用于 pandas、Redshift 频谱、Athena 等。

    希望能帮助到你。
    查鲁

    关于scala - 如何优化 Spark 以将大量数据写入 S3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59628550/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com