gpt4 book ai didi

apache-spark - Spark 流: How to periodically refresh cached RDD?

转载 作者:行者123 更新时间:2023-12-03 07:15:17 25 4
gpt4 key购买 nike

在我的 Spark 流应用程序中,我想根据从后端 (ElasticSearch) 检索的字典来映射值。我想定期刷新字典,以防它在后端更新。它类似于 Logstash 转换过滤器的定期刷新功能。我如何使用 Spark 实现这一点(例如,以某种方式每 30 秒取消持久化 RDD)?

最佳答案

我发现最好的方法是重新创建 RDD 并维护对其的可变引用。 Spark Streaming 的核心是 Spark 之上的调度框架。我们可以搭载调度程序来定期刷新 RDD。为此,我们使用一个空的 DStream,仅为刷新操作安排它:

def getData():RDD[Data] = ??? function to create the RDD we want to use af reference data
val dstream = ??? // our data stream

// a dstream of empty data
val refreshDstream = new ConstantInputDStream(ssc, sparkContext.parallelize(Seq())).window(Seconds(refreshInterval),Seconds(refreshInterval))

var referenceData = getData()
referenceData.cache()
refreshDstream.foreachRDD{_ =>
// evict the old RDD from memory and recreate it
referenceData.unpersist(true)
referenceData = getData()
referenceData.cache()
}

val myBusinessData = dstream.transform(rdd => rdd.join(referenceData))
... etc ...

过去,我也只尝试过交错使用 cache()unpersist() ,但没有结果(它只刷新一次)。重新创建 RDD 会删除所有沿袭并提供新数据的干净负载。

关于apache-spark - Spark 流: How to periodically refresh cached RDD?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37638519/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com