gpt4 book ai didi

apache-spark - 在 Spark 中加入多个表的有效方法 - 设备上没有剩余空间

转载 作者:行者123 更新时间:2023-12-01 07:08:41 25 4
gpt4 key购买 nike

有人问过类似的问题 here ,但它没有正确解决我的问题。我有近 100 个数据帧,每个数据帧至少有 200,000行,我需要加入它们,通过做 full基于列加入 ID ,从而创建一个带有列的 DataFrame - ID, Col1, Col2,Col3,Col4, Col5..., Col102 .

只是为了说明,我的 DataFrames 的结构 -

df1 =                          df2 =            df3 =          .....  df100 = 
+----+------+------+------+ +----+------+ +----+------+ +----+------+
| ID| Col1| Col2| Col3| | ID| Col4| | ID| Col5| | ID|Col102|
+----+------+-------------+ +----+------+ +----+------+ +----+------+
| 501| 25.1| 34.9| 436.9| | 501| 22.33| | 503| 22.33| | 501| 78,1|
| 502| 12.2|3225.9| 46.2| | 502| 645.1| | 505| 645.1| | 502| 54.9|
| 504| 754.5| 131.0| 667.3| | 504| 547.2| | 504| 547.2| | 507| 0|
| 505|324.12| 48.93| -1.3| | 506| 2| | 506| 2| | 509| 71.57|
| 506| 27.51| 88.99| 67.7| | 507| 463.7| | 507| 463.7| | 510| 82.1|
.
.
+----+------+------|------| |----|------| |----|------| |----|------|

我通过做 full 开始加入这些数据帧依次加入所有这些。自然,这是一个计算密集型的过程,必须努力减少 shuffles 的数量。跨不同的工作节点。因此,我首先对 DataFrame 进行分区 df1基于 ID使用 repartition() , 其中 hash-partitions基于 ID的DataFrame分成 30 个分区 -
df1 = df1.repartition(30,'ID')

现在,我做了一个 full加入 df1df2 .
df = df1.join(df2,['ID'],how='full')
df.persist()

df1已经 hash-partitioned ,所以我曾预料到这个 join以上将跳过洗牌并保持 partitionerdf1 ,但我注意到一个 shuffle确实发生了,它增加了 df 上的分区数至 200 .现在,如果我通过如下所示的函数调用它们来继续加入后续的数据帧,我会收到错误 java.io.IOException: No space left on device ——
def rev(df,num):
df_temp = spark.read.load(filename+str(num)+'.csv')
df_temp.persist()
df = df.join(df_temp,['ID'],how='full')
df_temp.unpersist()
return df

df = rev(df,3)
df = rev(df,4)
.
.
df = rev(df,100)
# I get the ERROR here below, when I call the first action count() -
print("Total number of rows: "+str(df.count()))
df.unpersist() # Never reached this stage.

更新:错误信息 -
Py4JJavaError: An error occurred while calling o3487.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 255.0 failed 1 times, most recent failure: Lost task 42.0 in stage 255.0 (TID 8755, localhost, executor driver): java.io.IOException: No space left on device
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)

问题: 1. 为什么 df1的分区器我们做第一个时没有维护 join ?

2.如何有效地连接这些多个表并避免这种情况 No space left on device问题?用户@silvio here建议使用 .bucketBy() ,但他也提到了分区程序将被维护的事实,但事实并非如此。因此,我不确定连接这些多个 DataFrame 的有效方法是什么。

任何建议/提示将不胜感激。

最佳答案

第一次尝试使用 for 循环(您可能已经拥有)每 N 次迭代保留您的大 df

第二次尝试通过设置 sqlContext.sql("set spark.sql.shuffle.partitions=100") 来控制默认分区号而不是默认值 200。

您的代码应如下所示:

num_partitions = 10
big_df = spark.createDataFrame(...) #empty df
for i in range(num_partitions):
big_df = big_df.join(df, ....)

if i % num_partitions == 0:
big_df = big_df.persist()

在这里,我每 10 次迭代调用一次 persist,您当然可以根据您的工作行为调整该数字。

编辑:
在您的情况下,您将本地 df_temp 保留在 rev 函数中,而不是包含所有先前连接的整个数据帧(在您的情况下为 df )。这对最终执行计划没有影响,因为它是本地持久化。至于我的建议,让我们假设您总共需要 100 个连接,然后使用上面的代码,您应该遍历循环 [1..100] 并每 10 次迭代保留累积的结果。持久化大数据帧后,DAG 将包含较少的内存计算,因为中间步骤将被存储,Spark 知道如何从存储中恢复它们,而不是从头开始重新计算所有内容。

关于apache-spark - 在 Spark 中加入多个表的有效方法 - 设备上没有剩余空间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55164274/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com