gpt4 book ai didi

PySpark-将 map 功能添加为列

转载 作者:行者123 更新时间:2023-12-04 16:42:10 27 4
gpt4 key购买 nike

我有一个pyspark DataFrame

a = [
('Bob', 562),
('Bob',880),
('Bob',380),
('Sue',85),
('Sue',963)
]
df = spark.createDataFrame(a, ["Person", "Amount"])

我需要创建一个对 Amount进行哈希处理并返回金额的列。问题是我不能使用 UDF,所以我使用了映射功能。

df.rdd.map(lambda x: hash(x["Amount"]))

最佳答案

如果您不能使用udf,则可以使用map函数,但是正如您当前编写的那样,将只有一列。要保留所有列,请执行以下操作:

df = df.rdd\
.map(lambda x: (x["Person"], x["Amount"], hash(str(x["Amount"]))))\
.toDF(["Person", "Amount", "Hash"])

df.show()
#+------+------+--------------------+
#|Person|Amount| Hash|
#+------+------+--------------------+
#| Bob| 562|-4340709941618811062|
#| Bob| 880|-7718876479167384701|
#| Bob| 380|-2088598916611095344|
#| Sue| 85| 7168043064064671|
#| Sue| 963|-8844931991662242457|
#+------+------+--------------------+

注意:在这种情况下, hash(x["Amount"])不是很有趣,因此我将其更改为哈希 Amount转换为字符串。

本质上,您必须将行映射到包含所有现有列的元组,然后添加新列。

如果您的列太多而无法枚举,则也可以只在现有行中添加一个元组。

df = df.rdd\
.map(lambda x: x + (hash(str(x["Amount"])),))\
.toDF(df.columns + ["Hash"])\

我还应该指出,如果哈希值是您的最终目标,那么还有一个pyspark函数 pyspark.sql.functions.hash 可以用来避免序列化到 rdd:

import pyspark.sql.functions as f
df.withColumn("Hash", f.hash("Amount")).show()
#+------+------+----------+
#|Person|Amount| Hash|
#+------+------+----------+
#| Bob| 562| 51343841|
#| Bob| 880|1241753636|
#| Bob| 380| 514174926|
#| Sue| 85|1944150283|
#| Sue| 963|1665082423|
#+------+------+----------+

这似乎使用了与python内置函数不同的哈希算法。

关于PySpark-将 map 功能添加为列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49879506/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com