gpt4 book ai didi

scala - 递归数据框操作

转载 作者:可可西里 更新时间:2023-11-01 14:56:28 26 4
gpt4 key购买 nike

在我的 spark 应用程序中,我想在循环中对数据帧执行操作并将结果写入 hdfs。

伪代码:

var df = emptyDataframe
for n = 1 to 200000{
someDf=read(n)
df = df.mergeWith(somedf)
}
df.writetohdfs

在上面的示例中,当“mergeWith”执行 unionAll 时,我得到了很好的结果。

但是,当我在“mergeWith”中进行(简单的)连接时,工作变得非常慢(> 1h,有 2 个执行器,每个执行器有 4 个内核)并且永远不会完成(工作自行中止)。

在我的场景中,我对仅包含 ~1mb 文本数据的文件进行了约 50 次迭代。

因为合并顺序对我来说很重要,我怀疑这是由于 DAG 生成导致的,导致整个事情在我存储数据的那一刻运行。

现在我正在尝试在合并的数据框架上使用 .persist,但这似乎也很慢。

编辑:

在作业运行时,我注意到(即使我进行了计数和 .persist)内存中的数据帧看起来不像静态数据帧。它看起来像是一条通往它一直在进行的所有合并的串联路径,有效地线性减慢了工作速度。

我是否可以假设 var df 是造成这种情况的罪魁祸首?

spiraling out of controle

我对问题的分割:

dfA = empty
dfC = dfA.increment(dfB)
dfD = dfC.increment(dfN)....

当我期望 DF' A C 和 D 是对象时,以不同的方式激发事物并且不关心我是否坚持或重新分区。Spark 看起来像这样:

dfA = empty
dfC = dfA incremented with df B
dfD = ((dfA incremented with df B) incremented with dfN)....

更新2

为了摆脱持续不在 DF 上工作的问题,我可以在将 DF 转换为 RDD 并再次转换回来时“打破”沿袭。这有一点开销,但可以接受(工作在几分钟内完成,而不是几小时/从不)我将对持久性进行更多测试,并以解决方法的形式给出答案。

结果:这似乎只能在表面上解决这些问题。实际上,我回到了原点并得到了 OOM 异常java.lang.OutOfMemoryError: GC overhead limit exceeded

最佳答案

如果你有这样的代码:

var df = sc.parallelize(Seq(1)).toDF()

for(i<- 1 to 200000) {
val df_add = sc.parallelize(Seq(i)).toDF()
df = df.unionAll(df_add)
}

然后 df 之后将有 400000 个分区,这使得以下操作效率低下(因为每个分区有 1 个任务)。

尝试减少分区数量,例如200 在保留数据帧之前(使用例如 df.coalesce(200).write.saveAsTable(....))

关于scala - 递归数据框操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40736993/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com