gpt4 book ai didi

apache-spark - 如何在 Spark 流中定期更新 rdd

转载 作者:行者123 更新时间:2023-12-04 04:34:55 26 4
gpt4 key购买 nike

我的代码是这样的:

sc = SparkContext()
ssc = StreamingContext(sc, 30)

initRDD = sc.parallelize('path_to_data')
lines = ssc.socketTextStream('localhost', 9999)
res = lines.transform(lambda x: x.join(initRDD))

res.pprint()

我的问题是 initRDD需要 每天午夜更新 .

我尝试这样:

sc = SparkContext()
ssc = StreamingContext(sc, 30)

lines = ssc.socketTextStream('localhost', 9999)


def func(rdd):
initRDD = rdd.context.parallelize('path_to_data')
return rdd.join(initRDD)


res = lines.transform(func)

res.pprint()

但似乎 initRDD将每 30 秒更新一次,与 batchDuration 相同

有没有好的理想

最佳答案

一种选择是在 transform 之前检查截止日期。 .检查是一个简单的比较,因此在每个批次间隔进行检查的成本很低:

def nextDeadline() : Long = {
// assumes midnight on UTC timezone.
LocalDate.now.atStartOfDay().plusDays(1).toInstant(ZoneOffset.UTC).toEpochMilli()
}
// Note this is a mutable variable!
var initRDD = sparkSession.read.parquet("/tmp/learningsparkstreaming/sensor-records.parquet")
// Note this is a mutable variable!
var _nextDeadline = nextDeadline()

val lines = ssc.socketTextStream("localhost", 9999)
// we use the foreachRDD as a scheduling trigger.
// We don't use the data, only the execution hook
lines.foreachRDD{ _ =>
if (System.currentTimeMillis > _nextDeadline) {
initRDD = sparkSession.read.parquet("/tmp/learningsparkstreaming/sensor-records.parquet")
_nextDeadline = nextDeadline()
}
}
// if the rdd was updated, it will be picked up in this stage.
val res = lines.transform(rdd => rdd.join(initRDD))

关于apache-spark - 如何在 Spark 流中定期更新 rdd,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45031215/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com