gpt4 book ai didi

scala - Apache Spark 缓存如何处理具有非线性 DAG 的未缓存文件源?

转载 作者:行者123 更新时间:2023-12-01 09:05:43 25 4
gpt4 key购买 nike

考虑下面的例子

val rdd1 = sc.textFile(...)
val rdd2 = sc.textFile(...)

val a = rdd1.doSomeTransformation
val b = rdd1.doAnotherTransformation

val c = rdd2.doSomeTransformation
val d = rdd2.doAnotherTransformation

//nonsense code, just to illustrate that it's all part of a big DAG (or so I think)
val vertices = a.join(b)

val edges = c.join(d) //corrected (thanks Justin!)

val graph = new Graph(vertices, edges) //or something like this

graph.cache()

graph.triplets.collect() // first "materialization"

graph.triplets.collect() // second "materialization"

我的问题是

如果我不缓存 rdd1 和 rdd2,它们会在“第一次实现”期间分别重新加载两次吗?

如果我缓存它们,那么它不会复制数据吗?有没有办法暂时缓存数据?例如缓存一个分区,直到图被缓存,当图被完全缓存时,然后取消持久化创建它的 RDD。那可能吗?

编辑:删除了臃肿的冗长内容并将问题集中在一个主题上。

最佳答案

你是正确的,这将运行两次,因为 DAG 会是这样的:

a = textFile1->doSomeTransformation
b = textFile1->doAnotherTransformation
c = textFile2->doSomeTransformation
d = textFile2->doAnotherTransformation
vertices = textFile1->doSomeTransformation | textFile1.doAnotherTransformation
edges = textFile2->doSomeTransformation | textFile2.doAnotherTransformation

请注意,是的,存在共性,但 afaik Spark 在连接时不处理该问题。 SparkSQL 可能在催化剂优化部分....但我非常怀疑。部分原因是数据的隐式缓存可能会扰乱内存存储计算并逐出您期望存在的缓存数据。您最好的选择是按如下方式重写它:

val rdd1 = sc.textFile(...)
.cache()
val rdd2 = sc.textFile(...)
.cache()

val a = rdd1.doSomeTransformation
val b = rdd1.doAnotherTransformation

val c = rdd2.doSomeTransformation
val d = rdd2.doAnotherTransformation


val vertices = a.join(b)
val edges = c.join(a)
val graph = new Graph(vertices, edges) //or something like this
graph.cache()

graph.triplets.collect() // first "materialization"
graph.triplets.collect() // second "materialization"

rdd1.unpersist()
rdd2.unpersist()

我会仔细检查,但不应该像您担心的那样进行双重缓存。 graph.cache 将搭载 textFile 缓存。

尽管如此,现在我可以专注于您没有链接,而是执行不同的计算这一事实,这是一个可以在配置或其他内容中启用的有趣想法。但是,这种功能有很多极端情况(它是否仅针对该 DAG 持续存在,或者它是否应该意识到 future 可能会进行调用?)。它必须类似于:spark.optimization.cacheDAGCommonalities

综上所述,如果 RDD 是“热的”,我已经看到它在后续请求中急剧下降(即 textFile1 需要 10 分钟,但只需要 3- 4 在下一次迭代中)

关于scala - Apache Spark 缓存如何处理具有非线性 DAG 的未缓存文件源?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28931838/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com