gpt4 book ai didi

python - PySpark 数据帧 : Change cell value based on min/max condition in another column

转载 作者:太空狗 更新时间:2023-10-30 02:25:39 26 4
gpt4 key购买 nike

我有以下 pySpark 数据框:

+------------------+------------------+--------------------+--------------+-------+
| col1| col2| col3| X| Y|
+------------------+------------------+--------------------+--------------+-------+
|2.1729247374294496| 3.558069532647046| 6.607603368496324| 1| null|
|0.2654841575294071|1.2633077949463256|0.023578679968183733| 0| null|
|0.4253301781296708|3.4566490739823483| 0.11711202266039554| 3| null|
| 2.608497168338446| 3.529397129549324| 0.373034222141551| 2| null|
+------------------+------------------+--------------------+--------------+-------+

这是一个相当简单的操作,我可以用 pandas 轻松完成。但是,我只需要使用 pySpark 即可。

我想做以下事情(我会写一些伪代码):

在 col3 == max(col3) 的行中,将 Y 从 null 更改为 'K'

在剩余的行中,在 col1 == max(col1) 的行中,将 Y 从 null 更改为 'Z'

在剩余的行中,在 col1 == min(col1) 的行中,将 Y 从 null 更改为 'U'

在剩余的行中:将 Y 从 null 更改为“I”。

因此,预期的输出是:

+------------------+------------------+--------------------+--------------+-------+
| col1| col2| col3| X| Y|
+------------------+------------------+--------------------+--------------+-------+
|2.1729247374294496| 3.558069532647046| 6.607603368496324| 1| K|
|0.2654841575294071|1.2633077949463256|0.023578679968183733| 0| U|
|0.4253301781296708|3.4566490739823483| 0.11711202266039554| 3| I|
| 2.608497168338446| 3.529397129549324| 0.373034222141551| 2| Z|
+------------------+------------------+--------------------+--------------+-------+

完成后,我需要使用此表来查找另一个表:

+--------------------+--------+-----+------------------+--------------+------------+
| x1| x2| x3| x4| X| d|
+--------------------+--------+-----+------------------+--------------+------------+
|0057f68a-6330-42a...| 2876| 30| 5.989999771118164| 0| 20171219|
|05cc0191-4ee4-412...| 108381| 34|24.979999542236328| 3| 20171219|
|06f353af-e9d3-4d0...| 118798| 34| 0.0| 3| 20171219|
|0c69b607-112b-4f3...| 20993| 34| 0.0| 0| 20171219|
|0d1b52ba-1502-4ff...| 23817| 34| 0.0| 0| 20171219|

我想使用第一个表作为查找以在第二个表中创建一个新列。应该使用第二个表中的 X 列作为键在第一个表的 Y 列中查找新列的值(因此我们在第一个表中的 Y 列中查找对应于 X 列中的值的值,这些值来自第二张表)。

UPD:我需要一个对满足两个条件的一行具有鲁棒性的解决方案,例如:

+------------------+------------------+--------------------+--------------+-------+
| col1| col2| col3| X| Y|
+------------------+------------------+--------------------+--------------+-------+
| 2.608497168338446| 3.558069532647046| 6.607603368496324| 1| null|
|0.2654841575294071|1.2633077949463256|0.023578679968183733| 0| null|
|0.4253301781296708|3.4566490739823483| 0.11711202266039554| 3| null|
|2.1729247374294496| 3.529397129549324| 0.373034222141551| 2| null|
+------------------+------------------+--------------------+--------------+-------+

在这种情况下,第 0 行同时满足 max('col3') 和 max('col1') 条件。

所以需要发生的是:

第 0 行变为“K”

第 3 行变为 'Z'(因为在剩余行中(0 已经有 'K' 第 3 行满足 max('col1') 条件

第 1 行变为“U”

第 2 行变成“我”

我不能在表 1 中有多个行,其中包含“I”。

最佳答案

计算聚合:

from pyspark.sql import functions as F

df = spark.createDataFrame([
(2.1729247374294496, 3.558069532647046, 6.607603368496324, 1),
(0.2654841575294071, 1.2633077949463256, 0.023578679968183733, 0),
(0.4253301781296708, 3.4566490739823483, 0.11711202266039554, 3),
(2.608497168338446, 3.529397129549324, 0.373034222141551, 2)
], ("col1", "col2", "col3", "x"))

min1, max1, max3 = df.select(F.min("col1"), F.max("col1"), F.max("col3")).first()

使用 when 添加列:

y = (F.when(F.col("col3") == max3, "K")  # In row where col3 == max(col3), change Y from null to 'K'
.when(F.col("col1") == max1, "Z") # In the remaining rows, in the row where col1 == max(col1), change Y from null to 'Z'
.when(F.col("col1") == min1, "U") # In the remaining rows, in the row where col1 == min(col1), change Y from null to 'U'
.otherwise("I")) # In the remaining row: change Y from null to 'I'

df_with_y = df.withColumn("y", y)


df_with_y.show()
# +------------------+------------------+--------------------+---+---+
# | col1| col2| col3| x| y|
# +------------------+------------------+--------------------+---+---+
# |2.1729247374294496| 3.558069532647046| 6.607603368496324| 1| K|
# |0.2654841575294071|1.2633077949463256|0.023578679968183733| 0| U|
# |0.4253301781296708|3.4566490739823483| 0.11711202266039554| 3| I|
# | 2.608497168338446| 3.529397129549324| 0.373034222141551| 2| Z|
# +------------------+------------------+--------------------+---+---+

The values for the new column should be looked up in column Y in first table using X column in second table as key

df_with_y.select("x", "Y").join(df2, ["x"])

如果 y 已经存在,并且您要保留非空值:

df_ = spark.createDataFrame([
(2.1729247374294496, 3.558069532647046, 6.607603368496324, 1, "G"),
(0.2654841575294071, 1.2633077949463256, 0.023578679968183733, 0, None),
(0.4253301781296708, 3.4566490739823483, 0.11711202266039554, 3, None),
(2.608497168338446, 3.529397129549324, 0.373034222141551, 2, None)
], ("col1", "col2", "col3", "x", "y"))

min1_, max1_, max3_ = df.filter(F.col("y").isNull()).select(F.min("col1"), F.max("col1"), F.max("col3")).first()

y_ = (F.when(F.col("col3") == max3_, "K")
.when(F.col("col1") == max1_, "Z")
.when(F.col("col1") == min1_, "U")
.otherwise("I"))

df_.withColumn("y", F.coalesce(F.col("y"), y_)).show()


# +------------------+------------------+--------------------+---+---+
# | col1| col2| col3| x| y|
# +------------------+------------------+--------------------+---+---+
# |2.1729247374294496| 3.558069532647046| 6.607603368496324| 1| G|
# |0.2654841575294071|1.2633077949463256|0.023578679968183733| 0| U|
# |0.4253301781296708|3.4566490739823483| 0.11711202266039554| 3| I|
# | 2.608497168338446| 3.529397129549324| 0.373034222141551| 2| K|
# +------------------+------------------+--------------------+---+---+

如果您遇到数值精度问题,您可以尝试:

threshold = 0.0000001 # Choose appropriate 

y_t = (F.when(F.abs(F.col("col3") - max3) < threshold, "K") # In row where col3 == max(col3), change Y from null to 'K'
.when(F.abs(F.col("col1") - max1) < threshold, "Z") # In the remaining rows, in the row where col1 == max(col1), change Y from null to 'Z'
.when(F.abs(F.col("col1") - min1) < threshold, "U") # In the remaining rows, in the row where col1 == min(col1), change Y from null to 'U'
.otherwise("I")) # In the remaining row: change Y from null to 'I'

df.withColumn("y", y_t).show()
# +------------------+------------------+--------------------+---+---+
# | col1| col2| col3| x| y|
# +------------------+------------------+--------------------+---+---+
# |2.1729247374294496| 3.558069532647046| 6.607603368496324| 1| K|
# |0.2654841575294071|1.2633077949463256|0.023578679968183733| 0| U|
# |0.4253301781296708|3.4566490739823483| 0.11711202266039554| 3| I|
# | 2.608497168338446| 3.529397129549324| 0.373034222141551| 2| Z|
# +------------------+------------------+--------------------+---+---+

关于python - PySpark 数据帧 : Change cell value based on min/max condition in another column,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48423534/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com