gpt4 book ai didi

python-3.x - 获取由 PySpark Dataframe 上的另一列分组的列的不同元素

转载 作者:行者123 更新时间:2023-12-05 06:22:56 25 4
gpt4 key购买 nike

我有一个包含 ID 和购买的 pyspark DF,我正在尝试对其进行转换以用于 FP 增长。目前,对于一个给定的 ID,我有多行,每行仅与一次购买有关。

我想将此数据框转换为包含两列的表单,一列用于 id(每个 id 一行),第二列包含该 id 的不同购买列表。

我尝试使用用户定义函数 (UDF) 将不同的购买映射到不同的 ID,但我收到“py4j.Py4JException:方法 getstate([]) 不存在”。感谢@Mithril我看到“您不能在 udf 和 pandas_udf 中使用 sparkSession 对象、spark.DataFrame 对象或其他 Spark 分布式对象,因为它们未被腌制。”

所以我已经实现了下面的糟糕方法(它可以工作但不可扩展):

#Lets create some fake transactions
customers = [1,2,3,1,1]
purschases = ['cake','tea','beer','fruit','cake']

# Lets create a spark DF to capture the transactions
transactions = zip(customers,purschases)
spk_df_1 = spark.createDataFrame(list(transactions) , ["id", "item"])

# Lets have a look at the resulting spark dataframe
spk_df_1.show()

# Lets capture the ids and list of their distinct pruschases in a
# list of tuples
purschases_lst = []
nums1 = []
import pyspark.sql.functions as f

# for each distinct id lets get the list of their distinct pruschases

for id in spark.sql("SELECT distinct(id) FROM TBLdf ").rdd.map(lambda row : row[0]).collect():
purschase = df.filter(f.col("id") == id).select("item").distinct().rdd.map(lambda row : row[0]).collect()
nums1.append((id,purschase))


# Lets see what our list of transaction tuples looks like
print(nums1)
print("\n")

# lets turn the list of transaction tuples into a pandas dataframe
df_pd = pd.DataFrame(nums1)

# Finally lets turn our pandas dataframe into a pyspark Dataframe
df2 = spark.createDataFrame(df_pd)
df2.show()

输出:

+---+-----+
| id| item|
+---+-----+
| 1| cake|
| 2| tea|
| 3| beer|
| 1|fruit|
| 1| cake|
+---+-----+

[(1, ['fruit', 'cake']), (3, ['beer']), (2, ['tea'])]


+---+-------------+
| 0| 1|
+---+-------------+
| 1|[fruit, cake]|
| 3| [beer]|
| 2| [tea]|
+---+-------------+

如果有人有任何建议,我将不胜感激。

最佳答案

这是 collect_set 的任务,它创建了一组没有重复的项目:

import pyspark.sql.functions as F

#Lets create some fake transactions
customers = [1,2,3,1,1]
purschases = ['cake','tea','beer','fruit','cake']

# Lets create a spark DF to capture the transactions
transactions = zip(customers,purschases)
spk_df_1 = spark.createDataFrame(list(transactions) , ["id", "item"])
spk_df_1.show()

spk_df_1.groupby('id').agg(F.collect_set('item')).show()

输出:

+---+-----+
| id| item|
+---+-----+
| 1| cake|
| 2| tea|
| 3| beer|
| 1|fruit|
| 1| cake|
+---+-----+

+---+-----------------+
| id|collect_set(item)|
+---+-----------------+
| 1| [fruit, cake]|
| 3| [beer]|
| 2| [tea]|
+---+-----------------+

关于python-3.x - 获取由 PySpark Dataframe 上的另一列分组的列的不同元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58846264/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com