gpt4 book ai didi

python - Pyspark 在查找前一行时按组迭代数据帧

转载 作者:可可西里 更新时间:2023-11-01 16:28:48 26 4
gpt4 key购买 nike

请帮助我,我是 spark 的新手。下面是我的数据框

type col1 col2 col3
1 0 41 0
1 27 0 0
1 1 0 0
1 183 0 2
2 null 0 0
2 null 10 0
3 0 126 0
3 2 0 1
3 4 0 0
3 5 0 0

下面应该是我的输出

type col1 col2 col3 result
1 0 41 0 0
1 27 0 0 14
1 1 0 0 13
1 183 0 2 -168
2 null 0 0
2 null 10 0
3 0 126 0 0
3 2 0 1 125
3 4 0 0 121
3 5 0 0 116

挑战在于必须对每一组类型的列进行计算,公式类似于 prev(col2)-col1+col3

我尝试在 col2 上使用 window 和 lag 函数来填充结果列,但它没有用。

下面是我的代码

part = Window().partitionBy().orderBy('type')
DF = DF.withColumn('result',lag("col2").over(w)-DF.col1+DF.col3)

现在我正在努力尝试使用 map 功能,请帮忙

最佳答案

逻辑有点棘手和复杂。

您可以在 pyspark 中执行以下操作

pyspark

from pyspark.sql import functions as F
from pyspark.sql import Window
import sys
windowSpec = Window.partitionBy("type").orderBy("type")
df = df.withColumn('result', F.lag(df.col2, 1).over(windowSpec) - df.col1 + df.col3)
df = df.withColumn('result', F.when(df.result.isNull(), F.lit(0)).otherwise(df.result))
df = df.withColumn('result', F.sum(df.result).over(windowSpec.rowsBetween(-sys.maxsize, -1)) + df.result)
df = df.withColumn('result', F.when(df.result.isNull(), F.lit(0)).otherwise(df.result))

斯卡拉

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
val windowSpec = Window.partitionBy("type").orderBy("type")
df.withColumn("result", lag("col2", 1).over(windowSpec) - $"col1"+$"col3")
.withColumn("result", when($"result".isNull, lit(0)).otherwise($"result"))
.withColumn("result", sum("result").over(windowSpec.rowsBetween(Long.MinValue, -1)) +$"result")
.withColumn("result", when($"result".isNull, lit(0)).otherwise($"result"))

您应该得到以下结果。

+----+----+----+----+------+
|type|col1|col2|col3|result|
+----+----+----+----+------+
|1 |0 |41 |0 |0.0 |
|1 |27 |0 |0 |14.0 |
|1 |1 |0 |0 |13.0 |
|1 |183 |0 |2 |-168.0|
|3 |0 |126 |0 |0.0 |
|3 |2 |0 |1 |125.0 |
|3 |4 |0 |0 |121.0 |
|3 |5 |0 |0 |116.0 |
|2 |null|0 |0 |0.0 |
|2 |null|10 |0 |0.0 |
+----+----+----+----+------+

已编辑

第一个 withColumn 应用公式 prev(col2) - col1 + col3。第二个 withColumnresult 列的 null 更改为 0。第三个 withColumn 用于累积和,即添加所有值直到结果列的当前行。所以这三个 withColumn 相当于 prev(col2) + prev(results) 1 col1 + col3。最后一个 withColumnresult 列中的 null 值更改为 0

关于python - Pyspark 在查找前一行时按组迭代数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46313562/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com