gpt4 book ai didi

dataframe - Spark SQL 1.6.0 - 简单查询的大量内存使用

转载 作者:行者123 更新时间:2023-12-05 05:22:35 25 4
gpt4 key购买 nike

我在 EMR 4.3 上使用 Spark 1.6 来查询属于 Hive Metastore 中的表的约 15TB 数据(由 S3 中的 gzipped parquet 文件支持)。对于我的集群,我有一个 r3.8xlarge 主节点和 15 个 r3.8xlarge 核心节点(3.6TB RAM,9.6TB SSD)。

大约 15TB 的数据包含在大约 90 亿行中。每行有大约 15 列存储长度为 5-50 的字符串,一列包含大约 30 个字符串的数组,每个字符串有 10-20 个字符。数组中只存储了约 100 万个唯一字符串。我想做的就是计算数组列中的唯一字符串,但似乎我内存不足,因为我不断收到:OutOfMemoryError: unable to create new native thread on the executors .由于内存不足错误,任务失败,执行程序被禁用,然后作业失败。

它在我查询 5-10TB 的数据时有效。我一定不能正确理解存储在内存中的内容(这就是我想要弄清楚的)。顺便说一句,对于上面的集群,我正在设置:

spark.executor.memory 30g
spark.executor.cores 5
spark.executor.instances 90 // 6 instances per r3.8xlarge host

我没想到 Spark SQL 会将中间表存储在内存中。由于不超过 1M 的唯一字符串,我认为带有计数的字符串应该很容易放入内存中。这是查询:

val initial_df = sqlContext.sql("select unique_strings_col from Table where timestamp_partition between '2016-09-20T07:00:00Z' and '2016-09-23T07:00:00Z'")
initial_df.registerTempTable("initial_table") // ~15TB compressed data to read in from S3

val unique_strings_df = sqlContext.sql("select posexplode(unique_strings_col) as (string_pos, string) from initial_table").select($"string_pos", $"string")
unique_strings_df.registerTempTable("unique_strings_table") // ~70% initial data remaining at this point

val strings_count_df = sqlContext.sql("select string, count(*) as unique_string_count from unique_strings_table where string_pos < 21 group by string order by unique_string_count desc") // ~50% initial data remaining at this point
strings_count_df.write.parquet("s3://mybucket/counts/2016-09-20-2016-09-23")

压缩的 parquet 文件很小(比如每个 5mb)。看起来它们可以一次读取一个,过滤并与它们的计数一起存储。我错过了什么?

最佳答案

所以事实证明我需要有足够的磁盘+内存空间来存储初始的RDD。如果我在创建临时表之前在初始 RDD 中进行更多的前期过滤,我就能够成功运行查询。耶!

关于dataframe - Spark SQL 1.6.0 - 简单查询的大量内存使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40035447/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com