gpt4 book ai didi

statistics - 是否可以使用 Spark Streaming 实时更新值?

转载 作者:行者123 更新时间:2023-12-02 03:26:01 24 4
gpt4 key购买 nike

假设我有一个 Double 值流,我想每十秒计算一次平均值。我怎样才能拥有一个不需要重新计算平均值而是更新它的滑动窗口,比方说,删除最旧的 10 秒的部分并仅添加新的 10 秒值?

最佳答案

TL;DR : 使用 reduceByWindow 及其两个函数参数(跳转到代码片段的最后一段)

对您的问题有两种解释,一种是具体的(我如何获得一小时的运行平均值,每 2 秒更新一次),另一种是一般的(我如何获得以稀疏方式更新状态的计算) .这是一般问题的答案。

首先,请注意有一种方法可以表示您的数据,这样您的平均更新很容易计算,基于 windowed DStream :这将您的数据表示为流的增量构建,具有最大共享。但是,如您所述,重新计算每批的平均值在计算上效率较低。

如果您确实想对可逆的复杂状态计算进行更新,但又不想触及流的构造,可以使用updateStateByKey。 – 但是 Spark 无法帮助您在流中反射(reflect)计算的增量方面,您必须自己管理它。

在这里,您确实拥有一些简单且可逆的东西,并且您没有键的概念。您可以使用 reduceByWindow使用其逆归约参数,使用可以让您计算增量均值的常用函数。

val myInitialDStream: DStream[Float]

val myDStreamWithCount: DStream[(Float, Long)] =
myInitialDStream.map((x) => (x, 1L))

def addOneBatchToMean(previousMean: (Float, Long), newBatch: (Float, Long)): (Float, Long) =
(previousMean._1 + newBatch._1, previousMean._2 + newBatch._2)

def removeOneBatchToMean(previousMean: (Float, Long), oldBatch: (Float, Long)): (Float, Long) =
(previousMean._1 - oldBatch._1, previousMean._2 - oldBatch._2)

val runningMeans = myDStreamWithCount.reduceByWindow(addOneBatchToMean, removeOneBatchToMean, Durations.seconds(3600), Duractions.seconds(2))

您得到一个单元素 RDD 流,每个元素都包含一对 (m, n),其中 m 是您在 1h 窗口上的运行总和,n 是元素的数量1小时窗口。只需返回(或 map 到)m/n 即可获得平均值。

关于statistics - 是否可以使用 Spark Streaming 实时更新值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30256209/

24 4 0