gpt4 book ai didi

scala - Spark 中的性能调整

转载 作者:行者123 更新时间:2023-12-05 02:55:12 28 4
gpt4 key购买 nike

我正在运行一个处理大约 2 TB 数据的 spark 作业。处理涉及:

  1. 读取数据(avrò 文件)
  2. 在 map 类型的列上展开
  3. OrderBy 分解列中的键
  4. 过滤 DataFrame(我有一组非常小的(7)键(称之为键集),我想为之过滤 df)。我做了一个 df.filter(col("key").isin(keyset: _*) )
  5. 我将这个 df 写入 Parquet (这个数据框非常小)
  6. 然后我再次为所有不在键集中的键过滤原始数据帧 df.filter(!col("key").isin(keyset: _*) ) 并将其写入 Parquet 。这是更大的数据集。

原始avro数据大约2TB。处理大约需要 1 小时。我想优化它。我在第 3 步之后缓存数据帧,使用 6000 的洗牌分区大小。最小执行器 = 1000,最大 = 2000,执行器内存 = 20 G,执行器核心 = 2。还有其他优化建议吗?左连接会比过滤器性能更好吗?

最佳答案

在我看来一切都很好。如果您的数据集较小,则 isin 没问题。

1) 确保您可以增加核心数。执行器核心=5

不建议每个执行器使用超过 5 个内核。这是基于一项研究,其中任何具有 5 个以上并发线程的应用程序都会开始影响性能。

2) 确保您拥有良好/统一的分区结构。

示例(仅用于调试目的,不用于生产):

  import org.apache.spark.sql.functions.spark_partition_id
yourcacheddataframe.groupBy(spark_partition_id).count.show()

这将打印 spark 分区号和多少条记录存在于每个分区中。如果您不想要更多并行性,您可以基于此重新分区。

3) spark.dynamicAllocation.enabled 可能是另一种选择。

例如:

spark-submit --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=100 --conf spark.shuffle.service.enabled=true

连同所有其他必需的 Prop ..... 这就是这项工作的目的。如果您在 spark-default.conf 中提供这些 Prop ,它将应用于所有作业。

使用上述所有这些选项,您的处理时间可能会缩短。

关于scala - Spark 中的性能调整,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61428896/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com