gpt4 book ai didi

scala - Spark数据框加入问题

转载 作者:行者123 更新时间:2023-12-02 19:40:45 24 4
gpt4 key购买 nike

下面的代码片段工作正常。(读取 CSV、读取 Parquet 并相互连接)

//Reading csv file -- getting three columns: Number of records: 1
df1=spark.read.format("csv").load(filePath)

df2=spark.read.parquet(inputFilePath)

//Join with Another table : Number of records: 30 Million, total
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1") "right")

奇怪的是下面的代码片段不起作用。 (读取Hbase、读取Parquet并相互连接)(区别是从Hbase读取)

//Reading from Hbase (It read from hbase properly -- getting three columns: Number of records: 1
df1=read from Hbase code
// It read from Hbase properly and able to show one record.
df1.show

df2=spark.read.parquet(inputFilePath)

//Join with Another table : Number of records: 50 Million, total
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1") "right")

错误:原因:org.apache.spark.SparkException:作业由于阶段失败而中止:56个任务的序列化结果的总大小(1024.4 MB)大于spark.driver.maxResultSize (1024.0 MB)

然后我添加了spark.driver.maxResultSize=5g,然后开始出现另一个错误,Java堆空间错误(在ThreadPoolExecutor.java运行)。如果我观察 Manager 中的内存使用情况,我会发现使用情况一直在上升,直到达到约 50GB,此时会发生 OOM 错误。因此,无论出于何种原因,用于执行此操作的 RAM 量大约比我尝试使用的 RDD 大小大 10 倍。

如果我将 df1 保留在内存和磁盘中并执行 count()。程序运行良好。代码片段如下

//Reading from Hbase -- getting three columns: Number of records: 1
df1=read from Hbase code

**df1.persist(StorageLevel.MEMORY_AND_DISK)
val cnt = df1.count()**

df2=spark.read.parquet(inputFilePath)

//Join with Another table : Number of records: 50 Million, total
columns: 15
df2.join(broadcast(df1), col("df2col1") === col("df1col1") "right")

它可以与文件一起使用,即使它具有相同的数据,但不能与 Hbase 一起使用。在 100 个工作节点集群上运行,每个工作节点有 125 GB 内存。所以内存不是问题。

我的问题是文件和 Hbase 都有相同的数据,并且都读取并能够 show() 数据。但为什么只有 Hbase 失败了。我正在努力理解这段代码可能出了什么问题。任何建议将不胜感激。

最佳答案

当提取数据时,spark 不知道从 HBase 检索的行数,因此选择的策略是排序合并连接。

因此它尝试对执行器之间的数据进行排序和洗牌。

为了避免这个问题,我们可以使用广播连接,同时我们不习惯使用键列对 df2 中的数据进行排序和洗牌,该键列显示代码片段中的最后一条语句。

但是为了绕过这个(因为它只有一行),我们可以使用 Case 表达式来填充列。

示例:

df.withColumn(
"newCol"
,when(col("df2col1").eq(lit(hbaseKey))
,lit(hbaseValueCol1))
.otherwise(lit(null))

关于scala - Spark数据框加入问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55093163/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com