gpt4 book ai didi

pyspark - 如何在pyspark数据帧(时间序列)中计算每日基础

转载 作者:行者123 更新时间:2023-12-04 08:49:52 24 4
gpt4 key购买 nike

所以我有一个数据框,我想计算一些数量,让我们说每天..假设我们有 10 列 col1,col2,col3,col4... coln,其中每一列都依赖于值 col1 , col2, col3 , col4.. 等等,日期根据 id 重置..

    +--------+----+----              +----+
date |col1|id |col2|. . |coln
+--------+----+---- +----+
2020-08-01| 0| M1 | . . . 3|
2020-08-02| 4| M1 | 10|
2020-08-03| 3| M1 | . . . 9 |
2020-08-04| 2| M1 | . . . 8 |
2020-08-05| 1| M1 | . . . 7 |
2020-08-06| 0| M1 | . . . 0 |
2020-08-01| 0| M2 | . . . 0 |
2020-08-02| 0| M2 | . . . . 1 |
2020-08-03| 0| M2 | . . . . 2 |
+---------+----+----+-----------------+
假设我们执行此数据帧,此 df 中可能有更多列...
为了说明这一点,假设今天的日期是 2020-08-01。我们做了一些计算,我们在 coln 得到了一些输出,比如说 coln =3在 2020-08-01,我想在 2020-08-02 coln == col1,即 col1 ==3 并在 2020-08-02 进行计算等等......所以 df 的例子看起来像下面这个
    +--------+----+----              +----+
date |col1|id |col2|. . |coln
+--------+----+---- +----+
2020-08-01| 0| M1 | . . . 3|
2020-08-02| 3| M1 | 10|
2020-08-03|10| M1 | . . . 9 |
2020-08-04| 9| M1 | . . . 8 |
2020-08-05| 8| M1 | . . . 7 |
2020-08-06| 7| M1 | . . . 0 |
2020-08-01| 0| M2 | . . . 1 |
2020-08-02| 1| M2 | . . . . 2 |
2020-08-03| 2| M2 | . . . . 0 |
+---------+----+----+-----------------+

如果你们能给我一个例子来说明如何在 pyspark 中做到这一点,那就太好了。
例如:让我们说 col3 = col1+ col2最初,假设 col1 全为 0。
df1_schema = StructType([StructField("Date", StringType(), True),\
StructField("col1", IntegerType(), True),\
StructField("id", StringType(), True),\
StructField("col2", IntegerType(), True),\
StructField("col3", IntegerType(), True),\
StructField("coln", IntegerType(), True)])
df_data = [('2020-08-01',0,'M1',3,3,2),('2020-08-02',0,'M1',2,3,1),\
('2020-08-03',0,'M1',3,3,3),('2020-08-04',0,'M1',3,3,1),\
('2020-08-01',0,'M2',1,3,1),('2020-08-02',0,'M2',-1,3,2)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()

+----------+----+---+----+----+----+
| Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2|
|2020-08-02| 0| M1| 2| 3| 1|
|2020-08-03| 0| M1| 3| 3| 3|
|2020-08-04| 0| M1| 3| 3| 1|
|2020-08-01| 0| M2| 1| 3| 1|
|2020-08-02| 0| M2| -1| 3| 2|
+----------+----+---+----+----+----+
所以让我们关注 2020-08-01这是开始,我们想要的是 col1+col2,即 3 = col3。在依赖于 col3..col4...col5.. 的第 n 次计算之后,假设我们得到了一些数字 coln=3。在计算完成后,我们想要在 2020-08-02 , coln=3 应该在 col1
所以它是在 2020-08-01 计算完成后动态变化的
enter image description here
所以我想要的 df 看起来像这样
+----------+----+---+----+----+----+
| Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2|
|2020-08-02| 2| M1| 2| 5| 1|
|2020-08-03| 1| M1| 3| 4| 3|
|2020-08-04| 3| M1| 3| 6| 1|
|2020-08-01| 1| M2| 1| 4| 1|
|2020-08-02| 1| M2| -1| 0| 2|
+----------+----+---+----+----+----+
编辑 2:
df1_schema = StructType([StructField("Date", StringType(), True),\
StructField("col1", IntegerType(), True),\
StructField("id", StringType(), True),\
StructField("col2", IntegerType(), True),\
StructField("col3", IntegerType(), True),\
StructField("col4", IntegerType(), True),\
StructField("coln", IntegerType(), True)])
df_data = [('2020-08-01',0,'M1',3,3,2,2),('2020-08-02',0,'M1',2,3,0,1),\
('2020-08-03',0,'M1',3,3,2,3),('2020-08-04',0,'M1',3,3,2,1),\
('2020-08-01',0,'M2',1,3,3,1),('2020-08-02',0,'M2',-1,3,1,2)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()
+----------+----+---+----+----+----+----+
| Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2| 2|
|2020-08-02| 0| M1| 2| 3| 0| 1|
|2020-08-03| 0| M1| 3| 3| 2| 3|
|2020-08-04| 0| M1| 3| 3| 2| 1|
|2020-08-01| 0| M2| 1| 3| 3| 1|
|2020-08-02| 0| M2| -1| 3| 1| 2|
+----------+----+---+----+----+----+----+
所以让我们说 coln = col4 - col2 然后
+----------+----+---+----+----+----+----+
| Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2| -1|
|2020-08-02| -1| M1| 2| 1| 0| -2|
|2020-08-03| -2| M1| 3| 1| 2| -1|
|2020-08-04| -1| M1| 3| 2| 2| -1|
|2020-08-01| 0| M2| 1| 1| 3| 2|
|2020-08-02| 2| M2| -1| 1| 1| 2|
+----------+----+---+----+----+----+----+

最佳答案

这是您可以使用 SparkSQL 内置函数处理的一类问题 aggregate (需要 Spark 2.4+ ),下面概述了基本思想:

from pyspark.sql.functions import sort_array, collect_list, struct, to_date

cols = ['Date', 'col1', 'col2', 'col3', 'coln']

df_new = df1.groupby('id') \
.agg(sort_array(collect_list(struct(*cols))).alias('dta')) \
.selectExpr("id", """
inline(
aggregate(
/* expr: iterate through the array `dta` from the 2nd to the last items*/
slice(dta,2,size(dta)-1),
/* start: AKA. the zero value which is an array of structs
* with a single element dta[0]
*/
array(dta[0]),
/* merge: do the calculations */
(acc, x) ->
concat(acc, array(named_struct(
'Date', x.Date,
'col1', element_at(acc, -1).coln,
'col2', x.col2,
'col3', element_at(acc, -1).col3 + x.col2,
'coln', x.col3 - x.col2
)))
)
)
""")
输出:
df_new.show()
+---+----------+----+----+----+----+
| id| Date|col1|col2|col3|coln|
+---+----------+----+----+----+----+
| M1|2020-08-01| 0| 3| 3| 2|
| M1|2020-08-02| 2| 2| 5| 1|
| M1|2020-08-03| 1| 3| 8| 0|
| M1|2020-08-04| 0| 3| 11| 0|
| M2|2020-08-01| 0| 1| 3| 1|
| M2|2020-08-02| 1| -1| 2| 4|
+---+----------+----+----+----+----+
哪里:
  • 我们分组相同的行 id并按 Date 对它们进行排序, 将生成的结构数组命名为 dta
  • 在聚合函数中,我们初始化 acc带有结构数组 array(dta[0])然后遍历数组 dta使用 slice 从第二个项目到最后一个项目功能
  • merge聚合函数的一部分,可以使用x.col1 , x.coln等引用相同日期的值并使用 element_at(acc, -1).col1 , element_at(acc, -1).coln等以引用前一个日期的值。
  • 在合并函数中,我们使用 concat(acc, array(...))将一个新元素附加到结构数组 acc
  • 使用 inline用于分解上述结构数组的函数 acc
  • 这个假设日期是连续的,如果缺少日期存在,您可以添加一些 IF 条件。例如计算col3以下:
    IF(datediff(x.Date, element_at(acc, -1).Date) = 1, element_at(acc, -1).coln, 0) + x.col2

  • 顺便提一句。我没有使用示例 coln = col4 - col2 , 使用 con3 = col3_prev + col2相反,我认为,这是一个更好的例子。

    关于pyspark - 如何在pyspark数据帧(时间序列)中计算每日基础,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64144891/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com