gpt4 book ai didi

scala - 在 Spark SQL 中聚合大型数据集

转载 作者:行者123 更新时间:2023-12-04 20:29:16 24 4
gpt4 key购买 nike

考虑以下代码:

case class Person(
personId: Long, name: String, ageGroup: String, gender: String,
relationshipStatus: String, country: String, state: String
)

case class PerPersonPower(personId: Long, power: Double)

val people: Dataset[Person] = ... // Around 50 million entries.
val powers: Dataset[PerPersonPower] = ... // Around 50 million entries.

people.join(powers, "personId")
.groupBy("ageGroup", "gender", "relationshipStatus", "country", "state")
.agg(
sum("power").alias("totalPower"),
count("*").alias("personCount")
)

它在具有大约 100 GB RAM 的集群上执行。但是,集群内存不足。我不知道该怎么做。实际上, people$"personId" 分区并缓存 -- people.repartition($"personId").cache() .

有什么想法可以优化这个计算吗?

集群是 Vanilla Google Dataproc cluster --- 所以它在客户端模式下使用 YARN --- 由 14 个节点组成,每个节点有 8 GB RAM。

最佳答案

根据请求中可用的有限信息,我可以建议不要使用缓存并创建比默认数量更多的分区(通常为 200,但可能因集群而异) - 尝试设置 spark.shuffle.partitions在您的应用程序中以 1000 或 2000 开始。它可以像 spark.conf.set('spark.shuffle.partitions', 1000) 那样完成.很可能您的查询命中了 SortMergeJoin 并且当前执行程序获得了更多的数据,它是堆减去 YARN 开销。请咨询您的SparkUI for the cluster以便监控和优化您的查询执行。在 SQL 选项卡中,您将看到有关每个阶段正在处理的数据量的非常详细的数字,因此您将识别瓶颈并更快地修复它们。

Spark 查询规划器将首先排序 PerPersonPowerPerson通过在 spark.shuffle.partitions 中定义的人数中的 personId ,将其刷新到 HDFS 到 spark.shuffle.partitions单独的 Parquet 文件,然后创建相同数量的部分聚合并将它们放入生成的数据帧中。

似乎您加入了大约 18-20GB(人)的数据,大约 800MB(功率)。如果功率会小一点,您可以尝试使用 BroadcastHashJoin喜欢 people.join(broadcast(powers), "personId") ,尽管我不建议广播大于 128Mb 或 256Mb 的数据帧。

祝你好运!

关于scala - 在 Spark SQL 中聚合大型数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49936862/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com