gpt4 book ai didi

python - 根据条件创建新列

转载 作者:行者123 更新时间:2023-12-01 00:09:06 25 4
gpt4 key购买 nike

我正在尝试找出如何将 Pandas 使用函数转换为 PySpark。

我有一个像这样的 Pandas DataFrame:

+---+----+
|num| val|
+---+----+
| 1| 0.0|
| 2| 0.0|
| 3|48.6|
| 4|49.0|
| 5|48.7|
| 6|49.1|
| 7|74.5|
| 8|48.7|
| 9| 0.0|
| 10|49.0|
| 11| 0.0|
| 12| 0.0|
+---+----+

下面代码片段中的代码相当简单。它继续前进,直到找到非零值。如果没有它们,它会出于相同的目的而倒退

def next_non_zero(data,i,column):
for j in range(i+1,len(data[column])):
res = data[column].iloc[j]
if res !=0:
return res
for j in range(i-1,0,-1):
res = data[column].iloc[j]
if res !=0:
return res

def fix_zero(data, column):
for i, row in data.iterrows():
if (row[column] == 0):
data.at[i,column] = next_non_zero(data,i,column)

因此我希望看到的结果

+---+----+
|num| val|
+---+----+
| 1|48.6|
| 2|48.6|
| 3|48.6|
| 4|49.0|
| 5|48.7|
| 6|49.1|
| 7|74.5|
| 8|48.7|
| 9|49.0|
| 10|49.0|
| 11|49.0|
| 12|49.0|
+---+----+

所以我确实明白,在 PySpark 中,我必须创建一个具有所需结果的新列,并使用例如 withColumn() 替换现有列。但是,我不明白如何正确迭代 DataFrame。

我正在尝试在 Window 上使用函数:

my_window = Window.partitionBy().orderBy('num')
df = df.withColumn('new_val', F.when(df.val==0,F.lead(df.val).over(my_window)).
otherwise(F.lag(df.val).over(my_window))

显然,它没有为我提供所需的结果,因为它只迭代一次。所以我尝试编写一些 udf 递归,例如

def fix_zero(param):

return F.when(F.lead(param).over(my_window)!=0,F.lead(param).over(my_window)).
otherwise(fix_zero(F.lead(param).over(my_window)))

spark_udf = udf(fix_zero, DoubleType())
df = df.withColumn('new_val', F.when(df.val!=0, df.val).otherwise(fix_zero('val')))

我得到了

RecursionError: maximum recursion depth exceeded in comparison

我怀疑这是因为我传递给递归的不是一行,而是 Lead() 的结果不管怎样,我现在完全陷入了这个障碍,非常感谢任何建议

最佳答案

Window 有一种方法可以遍历所有前面的行(或所有后面的行),直到达到非空值。

所以我的第一步是将所有 0 值替换为 null

重新创建数据框:

values = [
(1, 0.0),
(2,0.0),
(3,48.6),
(4,49.0),
(5,48.7),
(6,49.1),
(7, 74.5),
(8,48.7),
(9,0.0),
(10,49.0),
(11,0.0),
(12,0.0)
]

df = spark.createDataFrame(values, ['num','val'])

用 null 替换 0

from pyspark.sql.functions import when, lit, col
df= df.withColumn('val_null', when(col('val') != 0.0,col('val')))

然后定义窗口,结合first和null,将允许我们获取行前的最后一个非空值和行后的第一个非空值

from pyspark.sql import Window
from pyspark.sql.functions import last,first,coalesce


windowForward = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
ffilled_column = last(df['val_null'], ignorenulls=True).over(windowForward)

windowBackward = Window.rowsBetween(Window.currentRow,Window.unboundedFollowing)
bfilled_column = first(df['val_null'], ignorenulls=True).over(windowBackward)

# creating new columns in df
df =df.withColumn('ffill',ffilled_column).withColumn('bfill',bfilled_column)

# replace null with bfill if bfill is not null otherwise fill with ffill
df =df.withColumn('val_full',coalesce('bfill','ffill'))

使用此技术,我们在“val_full”列中得到您的预期输出

+---+----+--------+-----+-----+--------+
|num| val|val_null|ffill|bfill|val_full|
+---+----+--------+-----+-----+--------+
| 1| 0.0| null| null| 48.6| 48.6|
| 2| 0.0| null| null| 48.6| 48.6|
| 3|48.6| 48.6| 48.6| 48.6| 48.6|
| 4|49.0| 49.0| 49.0| 49.0| 49.0|
| 5|48.7| 48.7| 48.7| 48.7| 48.7|
| 6|49.1| 49.1| 49.1| 49.1| 49.1|
| 7|74.5| 74.5| 74.5| 74.5| 74.5|
| 8|48.7| 48.7| 48.7| 48.7| 48.7|
| 9| 0.0| null| 48.7| 49.0| 49.0|
| 10|49.0| 49.0| 49.0| 49.0| 49.0|
| 11| 0.0| null| 49.0| null| 49.0|
| 12| 0.0| null| 49.0| null| 49.0|
+---+----+--------+-----+-----+--------+

关于python - 根据条件创建新列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59752893/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com