gpt4 book ai didi

scala - Spark 中两个大型数据集之间的交叉连接

转载 作者:行者123 更新时间:2023-12-01 01:53:39 25 4
gpt4 key购买 nike

我有 2 个大型数据集。第一个数据集包含大约 1.3 亿个条目。
第二个数据集包含大约 40000 个条目。数据是从 MySQL 表中获取的。

我需要做一个交叉连接,但我得到了

java.sql.SQLException: GC overhead limit exceeded

在 Scala 中执行此操作的最佳技术是什么?

以下是我的代码片段:

val df1 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table1,"id",100,100000,40, MySqlConnection.getConnectionProperties))
val df2 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table2, MySqlConnection.getConnectionProperties))
val df2Cache = df2.repartition(40).cache()
val crossProduct = df1.join(df2Cache)

df1 是较大的数据集,df2 是较小的数据集。

最佳答案

130M*40K = 52 万亿条记录是存储此数据所需的 52 TB 内存,这是假设每条记录为 1 字节,这肯定不是真的。如果它多达 64 字节(我认为这也是一个非常保守的估计),您将需要 3.32 PB (!) 的内存来存储数据。这是一个非常大的数量,因此除非您有一个非常大的集群并且该集群内有非常快的网络,否则您可能需要重新考虑您的算法以使其正常工作。

也就是说,当您对两个 SQL 数据集/数据帧进行连接时,Spark 用于存储连接结果的分区数量由 spark 控制。 sql.shuffle.partitions 属性(参见 here )。您可能希望将其设置为一个非常大的数字,并将执行程序的数量设置为您可以设置的最大数量。然后您可能能够将处理运行到最后。

此外,您可能需要查看 spark.shuffle.minNumPartitionsToHighlyCompress选项;如果您将它设置为小于您的随机分区数,您可能会得到另一个内存提升。请注意,在最近的 Spark 版本之前,此选项是一个硬编码常量设置为 2000,因此根据您的环境,您只需将 spark.sql.shuffle.partitions 设置为大于 2000 的数字即可使用它。

关于scala - Spark 中两个大型数据集之间的交叉连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54154181/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com