gpt4 book ai didi

apache-spark - 如果自上次事件以来传感器读数未发生变化,但未发送传感器读数,则如何计算窗口上的聚合?

转载 作者:行者123 更新时间:2023-12-04 04:33:00 24 4
gpt4 key购买 nike

仅当自上次事件以来传感器值已发生更改时才发送新事件时,如何从传感器计算窗口上的聚合?传感器读数是在固定时间获取的,例如每5秒钟发送一次,但仅在自上次读取后读数发生变化时才转发。

因此,如果我想为每个设备创建signal_stength的平均值:

eventsDF = ... 
avgSignalDF = eventsDF.groupBy("deviceId").avg("signal_strength")

例如,设备在一分钟的时间范围内发送的事件:
event_time  device_id  signal_strength
12:00:00 1 5
12:00:05 1 4
12:00:30 1 5
12:00:45 1 6
12:00:55 1 5

带有未实际发送的事件的相同数据集已填写:
event_time  device_id  signal_strength
12:00:00 1 5
12:00:05 1 4
12:00:10 1 4
12:00:15 1 4
12:00:20 1 4
12:00:25 1 4
12:00:30 1 5
12:00:35 1 5
12:00:40 1 5
12:00:45 1 6
12:00:50 1 6
12:00:55 1 5

signal_strength sum57,而 avg57/12
如何通过 Spark 结构化流以及从推断值计算出的平均值来推断出这些丢失的数据?

注意:我已使用平均值作为聚合示例,但是该解决方案需要适用于任何聚合功能。

最佳答案

编辑:

我修改了逻辑以仅从过滤后的dataframe计算平均值,从而解决了差距。

//input structure
case class StreamInput(event_time: Long, device_id: Int, signal_strength: Int)
//columns for which we want to maintain state
case class StreamState(prevSum: Int, prevRowCount: Int, prevTime: Long, prevSignalStrength: Int, currentTime: Long, totalRow: Int, totalSum: Int, avg: Double)
//final result structure
case class StreamResult(event_time: Long, device_id: Int, signal_strength: Int, avg: Double)

val filteredDF = ??? //get input(filtered rows only)

val interval = 5 // event_time interval

// using .mapGroupsWithState to maintain state for runningSum & total row count till now

// you need to set the timeout threshold to indicate how long you wish to maintain the state
val avgDF = filteredDF.groupByKey(_.device_id)
.mapGroupsWithState[StreamState, StreamResult](GroupStateTimeout.NoTimeout()) {

case (id: Int, eventIter: Iterator[StreamInput], state: GroupState[StreamState]) => {
val events = eventIter.toSeq

val updatedSession = if (state.exists) {
//if state exists update the state with the new values
val existingState = state.get

val prevTime = existingState.currentTime
val currentTime = events.map(x => x.event_time).last
val currentRowCount = (currentTime - prevTime)/interval
val rowCount = existingState.rowCount + currentRowCount.toInt
val currentSignalStength = events.map(x => x.signal_strength).last

val total_signal_strength = currentSignalStength +
(existingState.prevSignalStrength * (currentRowCount -1)) +
existingState.total_signal_strength

StreamState(
existingState.total_signal_strength,
existingState.rowCount,
prevTime,
currentSignalStength,
currentTime,
rowCount,
total_signal_strength.toInt,
total_signal_strength/rowCount.toDouble
)

} else {
// if there are no earlier state
val runningSum = events.map(x => x.signal_strength).sum
val size = events.size.toDouble
val currentTime = events.map(x => x.event_time).last
StreamState(0, 1, 0, runningSum, currentTime, 1, runningSum, runningSum/size)
}

//save the updated state
state.update(updatedSession)
StreamResult(
events.map(x => x.event_time).last,
id,
events.map(x => x.signal_strength).last,
updatedSession.avg
)
}
}

val result = avgDF
.writeStream
.outputMode(OutputMode.Update())
.format("console")
.start

这个想法是要计算两个新的列:
  • totalRowCount:如果尚未过滤,则应该存在的行总数。
  • total_signal_strength:到现在为止signal_strength的运行总数。 (这也包括错过的行总数)。

  • 其计算公式为:
    total_signal_strength = 
    current row's signal_strength +
    (total_signal_strength of previous row * (rowCount -1)) +
    //rowCount is the count of missed rows computed by comparing previous and current event_time.
    previous total_signal_strength

    中间状态的格式:
    +----------+---------+---------------+---------------------+--------+
    |event_time|device_id|signal_strength|total_signal_strength|rowCount|
    +----------+---------+---------------+---------------------+--------+
    | 0| 1| 5| 5| 1|
    | 5| 1| 4| 9| 2|
    | 30| 1| 5| 30| 7|
    | 45| 1| 6| 46| 10|
    | 55| 1| 5| 57| 12|
    +----------+---------+---------------+---------------------+--------+

    最终输出:
    +----------+---------+---------------+-----------------+
    |event_time|device_id|signal_strength| avg|
    +----------+---------+---------------+-----------------+
    | 0| 1| 5| 5.0|
    | 5| 1| 4| 4.5|
    | 30| 1| 5|4.285714285714286|
    | 45| 1| 6| 4.6|
    | 55| 1| 5| 4.75|
    +----------+---------+---------------+-----------------+

    关于apache-spark - 如果自上次事件以来传感器读数未发生变化,但未发送传感器读数,则如何计算窗口上的聚合?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52747092/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com