- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
考虑以下数据:
EventDate,Value
1.1.2019,11
1.2.2019,5
1.3.2019,6
1.4.2019,-15
1.5.2019,-20
1.6.2019,-30
1.7.2019,12
1.8.2019,20
我想创建当这些值在阈值内时的组:
1. > 10
2. <=10 >=-10
3. >-10
结果应该是值的开始和结束处于某种状态:
1.1.2019, 1.1.2019, [11]
1.2.2019, 1.3.2019, [5, 6]
1.4.2019, 1.6.2019, [-15, -20, -30]
1.7.2019, 1.8.2018, [12, 20]
我相信答案就在窗口函数内,但我对 databricks 相当陌生,我还不明白如何使用它。
这是一个基于将数据帧作为列表循环的工作(python)解决方案,但是我更喜欢直接在数据帧上工作的解决方案以提高性能。
from pyspark.sql.functions import *
import pandas as pd
STATETHRESHOLDCHARGE = 10
list = [{"eventDateTime":x["EventDate"], "value":x["Value"]} for x in dataframe.sort(dfArrayOneCast.EventDate).rdd.collect()]
cycles = []
previous = None
for row in list:
currentState = 'charge'
if row["value"] < STATETHRESHOLDCHARGE and row["value"] > (STATETHRESHOLDCHARGE * -1):
currentState = 'idle'
if row["value"] <= (STATETHRESHOLDCHARGE * -1):
currentState = 'discharge'
eventDateTime = row["eventDateTime"]
if previous is None or previous["state"] != currentState:
previous = {"start":row["eventDateTime"], "end":row["eventDateTime"], "values":[row["value"]], "timestamps":[row["eventDateTime"]], "state":currentState}
cycles.append(previous)
else:
previous["end"] = row["eventDateTime"]
previous["values"].append(row["value"])
previous["timestamps"].append(row["eventDateTime"])
display(cycles)
最佳答案
假设 df 数据框中有上述数据,让我们逐条分析
from pyspark.sql.functions import col, last, lag, udf, when, collect_list
from pyspark.sql.types import StringType
value = 'value'
date = 'EventDate'
valueBag = 'valueBag'
def bagTransform(v):
if v > 10:
return 'charging'
elif v < -10:
return 'discharging'
else:
return 'idle'
bagTransformUDF = udf(bagTransform, StringType())
withBaggedValue = df.withColumn(valueBag, bagTransformUDF(col(value)))
因此,首先我们将值放入您声明的范围中,现在我们可以使用 lag
将窗口移动到先前的值:
from pyspark.sql import Window
windowSpec = Window.orderBy(date)
prevValueBag = 'prevValueBag'
bagBeginning = 'bagBeginning'
withLag = (withBaggedValue
.withColumn(prevValueBag, lag(withBaggedValue[valueBag]).over(windowSpec)))
现在有趣的部分开始:我们检测变化点并临时分配当前事件日期或 null:
withInitialBeginnings = withLag.withColumn(bagBeginning, when((col(prevValueBag) != col(valueBag)) | col(prevValueBag).isNull(), col(date)).otherwise(None))
并使用最后找到的值填写它们
withFilledBeginnings = (withInitialBeginnings.withColumn(bagBeginning,
last(col(bagBeginning), ignorenulls=True)
.over(windowSpec)))
display(withFilledBeginnings)
aggregate = withFilledBeginnings.groupby(col(bagBeginning)).agg(collect_list(value))
display(aggregate)
如果您还需要结束日期,您可以使用 pyspark.sql.functions.lead 进行类似的预处理,该预处理与 last
对称但向前。
关于python - 在 Azure Databricks 中按范围内的值进行分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57580140/
我已经开始阅读 Databricks 推出的 Unity Catalog。我了解它试图解决的基本问题,但我不了解目录到底是什么。 这在 Databricks 文档中可用, A catalog cont
我正在努力了解 Databricks。 我发现文档逐步从 S3 或 Azure Datalake 导入数据,然后输出到 Azure Synapse Analytics 或其他数据仓库解决方案。 快速播
我想以编程方式将(Python Wheel)库添加到 /Shared Databricks 上的工作区。在 GUI(工作区 > 导入 > 库)中很容易做到,但我无法弄清楚如何在 Databricks
我正在创建一个带有公司 Logo 的 databricks 笔记本模板。使用以下代码显示图像会引发错误。 代码: %md 错误: HTTP ERROR 403: Invalid or missing
我将使用这张图片来形象化我的问题: Databricks1 在 Databricks 中创建数据库(和表)并将其数据存储在存储帐户中。在Databricks2中我想读取数据:Databricks2只有
有没有办法通过 python 笔记本确定现有的 Azure Databricks Secret Scope 是否由 Key Vault 或 Databricks 支持? dbutils.secrets
我正在尝试连接到 Databricks 上的 Spark 集群,并且正在学习本教程:https://docs.databricks.com/dev-tools/dbt.html .我安装了 dbt-d
我们可以使用Autoloader跟踪是否已从 S3 存储桶加载的文件。我关于 Autoloader 的问题:有没有办法读取 Autoloader 数据库以获取已加载文件的列表? 我可以在 AWS Gl
我们可以使用一些帮助来了解如何将 Spark Driver 和 worker 日志发送到 Azure Databricks 之外的目的地,例如Azure Blob 存储或使用 Eleastic-bea
将我的 Azure Databricks 从标准升级到主要,尝试开始使用 Databricks Delta: create table t using delta as select * from t
现在,databricks 自动加载器需要一个目录路径,从中加载所有文件。但是,如果其他类型的日志文件也开始进入该目录 - 有没有办法让 Autoloader 在准备数据帧时排除这些文件? df =
有人可以让我知道如何使用 databricks dbutils 从文件夹中删除所有文件。 我尝试了以下但不幸的是,Databricks 不支持通配符。 dbutils.fs.rm('adl://azu
我是 azure 的新手和databricks ,我学会了如何安装 blob 和利用,但我有一些疑问,而且我还没有找到任何文档的任何答案。所以请帮我解释一下: dbutils.fs.mount(
尝试遍历已安装的 Databricks 卷中的目录时遇到 ClassCastException。 java.lang.ClassCastException: com.databricks.backen
尝试遍历已安装的 Databricks 卷中的目录时遇到 ClassCastException。 java.lang.ClassCastException: com.databricks.backen
我正在运行 Databricks Community Edition,我想从以下 mnt 目录中删除文件 /mnt/driver-daemon/jars 我运行 dbutils 命令: dbutils
我已经在我的机器上创建了“.netrc”文件并尝试在 databricks rest api 调用下面。但它总是给出未经授权的错误。如何在 Databricks 中创建 .netrc 文件? curl
没有意识到 shift+enter 运行一个单元格。我正在写一个 delete from table 并按下 shift enter 删除了表中的所有数据。 最佳答案 在 Delta Lake 表中,
我需要访问 Azure Files来自 Azure Databricks .根据文档 Azure Blobs受支持,但我需要此代码来处理 Azure 文件: dbutils.fs.mount( s
我正在尝试使用服务主体从 Databricks 连接到 Synapse。 我已经在集群配置中配置了服务主体 fs.azure.account.auth.type..dfs.core.windows.n
我是一名优秀的程序员,十分优秀!