gpt4 book ai didi

apache-spark - 如何在 PySpark 的分组对象中插入一列?

转载 作者:行者123 更新时间:2023-12-01 10:19:42 25 4
gpt4 key购买 nike

如何在分组数据中插入 PySpark 数据帧?

例如:

我有一个包含以下列的 PySpark 数据框:

+--------+-------------------+--------+
|webID |timestamp |counts |
+--------+-------------------+--------+
|John |2018-02-01 03:00:00|60 |
|John |2018-02-01 03:03:00|66 |
|John |2018-02-01 03:05:00|70 |
|John |2018-02-01 03:08:00|76 |
|Mo |2017-06-04 01:05:00|10 |
|Mo |2017-06-04 01:07:00|20 |
|Mo |2017-06-04 01:10:00|35 |
|Mo |2017-06-04 01:11:00|40 |
+--------+----------------- -+--------+

我需要在他们自己的时间间隔内每分钟将 John 和 Mo 的计数数据插入一个数据点。我对任何简单的线性插值持开放态度 - 但请注意,我的真实数据是每隔几秒一次,我想插值到每一秒。

所以结果应该是:
+--------+-------------------+--------+
|webID |timestamp |counts |
+--------+-------------------+--------+
|John |2018-02-01 03:00:00|60 |
|John |2018-02-01 03:01:00|62 |
|John |2018-02-01 03:02:00|64 |
|John |2018-02-01 03:03:00|66 |
|John |2018-02-01 03:04:00|68 |
|John |2018-02-01 03:05:00|70 |
|John |2018-02-01 03:06:00|72 |
|John |2018-02-01 03:07:00|74 |
|John |2018-02-01 03:08:00|76 |
|Mo |2017-06-04 01:05:00|10 |
|Mo |2017-06-04 01:06:00|15 |
|Mo |2017-06-04 01:07:00|20 |
|Mo |2017-06-04 01:08:00|25 |
|Mo |2017-06-04 01:09:00|30 |
|Mo |2017-06-04 01:10:00|35 |
|Mo |2017-06-04 01:11:00|40 |
+--------+----------------- -+--------+

需要将新行添加到我的原始数据框中。
寻找 PySpark 解决方案。

最佳答案

如果您使用 Python,完成任务的最短方法是重用现有的 Pandas 函数,使用 GROUPED_MAP udf:

from operator import attrgetter
from pyspark.sql.types import StructType
from pyspark.sql.functions import pandas_udf, PandasUDFType

def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
@pandas_udf(
StructType(sorted(schema, key=attrgetter("name"))),
PandasUDFType.GROUPED_MAP)
def _(pdf):
pdf.set_index(timestamp_col, inplace=True)
pdf = pdf.resample(freq).interpolate()
pdf.ffill(inplace=True)
pdf.reset_index(drop=False, inplace=True)
pdf.sort_index(axis=1, inplace=True)
return pdf
return _

应用于您的数据:

from pyspark.sql.functions import to_timestamp

df = spark.createDataFrame([
("John", "2018-02-01 03:00:00", 60),
("John", "2018-02-01 03:03:00", 66),
("John", "2018-02-01 03:05:00", 70),
("John", "2018-02-01 03:08:00", 76),
("Mo", "2017-06-04 01:05:00", 10),
("Mo", "2017-06-04 01:07:00", 20),
("Mo", "2017-06-04 01:10:00", 35),
("Mo", "2017-06-04 01:11:00", 40),
], ("webID", "timestamp", "counts")).withColumn(
"timestamp", to_timestamp("timestamp")
)

df.groupBy("webID").apply(resample(df.schema, "60S")).show()

它产生

+------+-------------------+-----+
|counts| timestamp|webID|
+------+-------------------+-----+
| 60|2018-02-01 03:00:00| John|
| 62|2018-02-01 03:01:00| John|
| 64|2018-02-01 03:02:00| John|
| 66|2018-02-01 03:03:00| John|
| 68|2018-02-01 03:04:00| John|
| 70|2018-02-01 03:05:00| John|
| 72|2018-02-01 03:06:00| John|
| 74|2018-02-01 03:07:00| John|
| 76|2018-02-01 03:08:00| John|
| 10|2017-06-04 01:05:00| Mo|
| 15|2017-06-04 01:06:00| Mo|
| 20|2017-06-04 01:07:00| Mo|
| 25|2017-06-04 01:08:00| Mo|
| 30|2017-06-04 01:09:00| Mo|
| 35|2017-06-04 01:10:00| Mo|
| 40|2017-06-04 01:11:00| Mo|
+------+-------------------+-----+

这是在假设单个 webID 的输入和插值数据都有效的情况下工作的。可以适合单个节点的内存(通常其他精确的非迭代解决方案必须做出类似的假设)。如果不是这种情况,您可以通过重叠窗口轻松近似

partial = (df
.groupBy("webID", window("timestamp", "5 minutes", "3 minutes")["start"])
.apply(resample(df.schema, "60S")))

并汇总最终结果

from pyspark.sql.functions import mean

(partial
.groupBy("webID", "timestamp")
.agg(mean("counts")
.alias("counts"))
# Order by key and timestamp, only for consistent presentation
.orderBy("webId", "timestamp")
.show())

这当然要昂贵得多(有两次洗牌,并且一些值将被多次计算),但如果重叠不足以包括下一次观察,也会留下间隙。
+-----+-------------------+------+
|webID| timestamp|counts|
+-----+-------------------+------+
| John|2018-02-01 03:00:00| 60.0|
| John|2018-02-01 03:01:00| 62.0|
| John|2018-02-01 03:02:00| 64.0|
| John|2018-02-01 03:03:00| 66.0|
| John|2018-02-01 03:04:00| 68.0|
| John|2018-02-01 03:05:00| 70.0|
| John|2018-02-01 03:08:00| 76.0|
| Mo|2017-06-04 01:05:00| 10.0|
| Mo|2017-06-04 01:06:00| 15.0|
| Mo|2017-06-04 01:07:00| 20.0|
| Mo|2017-06-04 01:08:00| 25.0|
| Mo|2017-06-04 01:09:00| 30.0|
| Mo|2017-06-04 01:10:00| 35.0|
| Mo|2017-06-04 01:11:00| 40.0|
+-----+-------------------+------+

关于apache-spark - 如何在 PySpark 的分组对象中插入一列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54612642/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com