gpt4 book ai didi

python - 在 Azure Databricks 中按范围内的值进行分组

转载 作者:太空宇宙 更新时间:2023-11-03 23:56:27 25 4
gpt4 key购买 nike

考虑以下数据:

EventDate,Value
1.1.2019,11
1.2.2019,5
1.3.2019,6
1.4.2019,-15
1.5.2019,-20
1.6.2019,-30
1.7.2019,12
1.8.2019,20

我想创建当这些值在阈值内时的组:

 1. > 10
2. <=10 >=-10
3. >-10

结果应该是值的开始和结束处于某种状态:

1.1.2019, 1.1.2019, [11]
1.2.2019, 1.3.2019, [5, 6]
1.4.2019, 1.6.2019, [-15, -20, -30]
1.7.2019, 1.8.2018, [12, 20]

我相信答案就在窗口函数内,但我对 databricks 相当陌生,我还不明白如何使用它。

这是一个基于将数据帧作为列表循环的工作(python)解决方案,但是我更喜欢直接在数据帧上工作的解决方案以提高性能。

from pyspark.sql.functions import *
import pandas as pd
STATETHRESHOLDCHARGE = 10
list = [{"eventDateTime":x["EventDate"], "value":x["Value"]} for x in dataframe.sort(dfArrayOneCast.EventDate).rdd.collect()]
cycles = []
previous = None
for row in list:
currentState = 'charge'
if row["value"] < STATETHRESHOLDCHARGE and row["value"] > (STATETHRESHOLDCHARGE * -1):
currentState = 'idle'
if row["value"] <= (STATETHRESHOLDCHARGE * -1):
currentState = 'discharge'

eventDateTime = row["eventDateTime"]
if previous is None or previous["state"] != currentState:
previous = {"start":row["eventDateTime"], "end":row["eventDateTime"], "values":[row["value"]], "timestamps":[row["eventDateTime"]], "state":currentState}
cycles.append(previous)
else:
previous["end"] = row["eventDateTime"]
previous["values"].append(row["value"])
previous["timestamps"].append(row["eventDateTime"])

display(cycles)

最佳答案

假设 df 数据框中有上述数据,让我们逐条分析

from pyspark.sql.functions import col, last, lag, udf, when, collect_list
from pyspark.sql.types import StringType
value = 'value'
date = 'EventDate'
valueBag = 'valueBag'

def bagTransform(v):
if v > 10:
return 'charging'
elif v < -10:
return 'discharging'
else:
return 'idle'

bagTransformUDF = udf(bagTransform, StringType())

withBaggedValue = df.withColumn(valueBag, bagTransformUDF(col(value)))

因此,首先我们将值放入您声明的范围中,现在我们可以使用 lag 将窗口移动到先前的值:

from pyspark.sql import Window
windowSpec = Window.orderBy(date)
prevValueBag = 'prevValueBag'
bagBeginning = 'bagBeginning'

withLag = (withBaggedValue
.withColumn(prevValueBag, lag(withBaggedValue[valueBag]).over(windowSpec)))

现在有趣的部分开始:我们检测变化点并临时分配当前事件日期或 null:

withInitialBeginnings = withLag.withColumn(bagBeginning, when((col(prevValueBag) != col(valueBag)) | col(prevValueBag).isNull(), col(date)).otherwise(None))

并使用最后找到的值填写它们

withFilledBeginnings = (withInitialBeginnings.withColumn(bagBeginning, 
last(col(bagBeginning), ignorenulls=True)
.over(windowSpec)))
display(withFilledBeginnings)

results table有了这个集合,我们就可以简单地聚合起始点

aggregate = withFilledBeginnings.groupby(col(bagBeginning)).agg(collect_list(value))

display(aggregate)

aggregated results

如果您还需要结束日期,您可以使用 pyspark.sql.functions.lead 进行类似的预处理,该预处理与 last 对称但向前。

关于python - 在 Azure Databricks 中按范围内的值进行分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57580140/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com