gpt4 book ai didi

python - Spark : Python Windowed Functions for Data Frames

转载 作者:行者123 更新时间:2023-11-28 19:15:28 24 4
gpt4 key购买 nike

用例是捕获流式传感器条目之间的时间差异,其中站点和部件相同,以与公差进行比较,并在超出范围时可能触发警报。我目前正在将字段解析为数据框并将其注册为表以使用 LAG 函数执行 SQL 查询。

events = rawFilter.map(lambda x: x.split("|")).map(lambda x: (x[0], x[1], x[2]))
eventSchema = StructType(
[StructField("station", StringType(), False),
StructField("part", StringType(), False),
StructField("event", TimestampType(), False)])

eventDF = sqlContext.createDataFrame(events,eventSchema)
eventDF.registerTempTable("events_table")

%sql select station, part, event, prev_event,
cast(event as double) - cast(prev_event as double) as CycleTime
from (select station, part, event,
LAG(event) over (Partition BY station, part Order BY event) as Prev_Event
from events_table) x limit 10

Example Streaming Sensor Data:
station1|part1|<timestamp>
station2|part2|<timestamp>
station3|part3|<timestamp>
station1|part1|<timestamp>
station1|part1|<timestamp>
station1|part1|<timestamp>
station3|part3|<timestamp>
station1|part1|<timestamp>

我想了解的是如何在数据框中完成窗口函数,以便生成的表已经计算出时差?

这道题的Part 2是了解part变化时如何处理。在那种情况下,不应计算或停止 CycleTime;但是,同一站的两个不同部分之间的时间差是另一种称为 ChangeOver 的计算。我看不出如何使用 Spark Streaming 完成此操作,因为窗口可能会在部件更改之前延长几天。所以就想着把数据推到Hbase或者别的什么地方去计算ChangeOver。

最佳答案

DataFrames 上的窗口定义与 partitionByorderByrangeBetweenrowsBetween 紧密遵循 SQL 约定 对应于等效 SQL 子句的方法。

from pyspark.sql.functions import col, lag, unix_timestamp
from pyspark.sql.window import Window

rawDF = sc.parallelize([
("station1", "part1", "2015-01-03 00:11:02"),
("station2", "part2", "2015-02-00 10:20:10"),
("station3", "part3", "2015-03-02 00:30:00"),
("station1", "part1", "2015-05-00 01:07:00"),
("station1", "part1", "2015-01-13 05:16:10"),
("station1", "part1", "2015-11-20 10:22:40"),
("station3", "part3", "2015-09-04 03:15:22"),
("station1", "part1", "2015-03-05 00:41:33")
]).toDF(["station", "part", "event"])

eventDF = rawDF.withColumn("event", unix_timestamp(col("event")))

w = Window.partitionBy(col("station")).orderBy(col("event"))

(eventDF
.withColumn("prev_event", lag(col("event")).over(w))
.withColumn("cycle_time", col("event") - col("prev_event")))

关于python - Spark : Python Windowed Functions for Data Frames,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33921571/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com