gpt4 book ai didi

apache-spark - 从 DStream RDD 添加新元素到批处理 RDD

转载 作者:行者123 更新时间:2023-12-04 05:00:13 26 4
gpt4 key购买 nike

将 DStream RDD 与 Batch RDD 结合/联合/组合的唯一方法是通过“transform”方法,该方法返回另一个 DStream RDD,因此它在微批处理结束时被丢弃。

有没有办法,例如union Dstream RDD with Batch RDD 生成一个新的 Batch RDD,其中包含 DStream RDD 和 Batch RDD 的元素。

一旦以上述方式创建了这样的 Batch RDD,它是否可以被其他 DStream RDD 使用,例如加入,因为这次结果可能是另一个 DStream RDD

有效地,上述功能将导致定期更新(添加)批处理 RDD 中的元素 - 额外的元素将不断来自 DStream RDD,这些 RDD 会随着每个微批处理不断流入。新到达的 DStream RDD 也将能够与先前更新的 BAtch RDD 结合并产生结果 DStream RDD

几乎类似的东西可以用 updateStateByKey 来实现,但是有没有办法按照这里描述的那样来做呢

最佳答案

另一种方法是将批处理输入转换为 DStream 并将其与流式输入联合。然后你使用 foreachRDD 将它写出来,这是你对其他作业的新批输入。

 val batch = sc.textFile(...)

val ssc = new StreamingContext(sc, Seconds(30))
val stream = ssc.textFileStream(...)

import scala.collection.mutable
val batchStream = ssc.queueStream(mutable.Queue.empty[RDD[String]], oneAtATime = false, defaultRDD = batch)

val union = ssc.union(Seq(stream, batchStream))

union.print()

union.foreachRDD { rdd =>
// Delete previous, or use SchemaRDD with .insertInto(, overwrite = true)
rdd.saveTextFile(...)
}

ssc.start()
ssc.awaitTermination()

关于apache-spark - 从 DStream RDD 添加新元素到批处理 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29660547/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com