gpt4 book ai didi

python 到 pyspark,转换 pyspark 中的枢轴

转载 作者:太空宇宙 更新时间:2023-11-04 04:47:09 25 4
gpt4 key购买 nike

我有以下 DataFrame 并在 python 中实现了所需的输出。但我想将其转换为 pyspark。

d = {'user': ['A', 'A', 'B','B','C', 'D', 'C', 'E', 'D', 'E', 'F', 'F'], 'songs' : [11,22,99,11,11,44,66,66,33,55,11,77]}
data = pd.DataFrame(data = d)


e = {'user': ['A', 'B','C', 'D', 'E', 'F','A'], 'cluster': [1,2,3,1,2,3,2]}
clus = pd.DataFrame(data= e)

期望的输出:我想获得特定集群的用户没有听过的所有歌曲。 A 属于集群 1,集群 1 有歌曲 [11,22,33,44] 所以 A 没有听 [33,44] 所以我使用下面的 python 代码实现了这一点。

user
A [33, 44]
B [55, 66]
C [77]
D [11, 22]
E [11, 99]
F [66]

Python 代码:

df = pd.merge(data, clus, on='user', how='left').drop_duplicates(['user','movie'])

df1 = (df.groupby(['cluster']).apply(lambda x: x.pivot('user','movie','cluster').isnull())
.fillna(False)
.reset_index(level=0, drop=True)
.sort_index())

s = np.where(df1, ['{}'.format(x) for x in df1.columns], '')

#remove empty values
s1 = pd.Series([''.join(x).strip(', ') for x in s], index=df1.index)
print (s1)

在 pyspark 分布式编码中很想实现同样的功能?

最佳答案

可能有比这更好的解决方案,但它确实有效。

假设每个用户只属于一个集群,

import pyspark.sql.functions as F
from pyspark.sql.types import *

d = zip(['A', 'A', 'B','B','C', 'D', 'C', 'E', 'D', 'E', 'F', 'F'],[11,22,99,11,11,44,66,66,33,55,11,77])
data = sql.createDataFrame(d).toDF('user','songs')

这给出了,

+----+-----+
|user|songs|
+----+-----+
| A| 11|
| A| 22|
| B| 99|
| B| 11|
| C| 11|
| D| 44|
| C| 66|
| E| 66|
| D| 33|
| E| 55|
| F| 11|
| F| 77|
+----+-----+

创建集群假设每个用户只属于一个集群,

c = zip(['A', 'B','C', 'D',  'E', 'F'],[1,2,3,1,2,3])
clus = sql.createDataFrame(c).toDF('user','cluster')
clus.show()

+----+-------+
|user|cluster|
+----+-------+
| A| 1|
| B| 2|
| C| 3|
| D| 1|
| E| 2|
| F| 3|
+----+-------+

现在,我们获取用户听到的所有歌曲以及他们的集群,

all_combine = data.groupBy('user').agg(F.collect_list('songs').alias('songs'))\
.join(clus, data.user==clus.user).select(data['user'],'songs','cluster')
all_combine.show()
+----+--------+-------+
|user| songs|cluster|
+----+--------+-------+
| F|[11, 77]| 3|
| E|[66, 55]| 2|
| B|[99, 11]| 2|
| D|[44, 33]| 1|
| C|[11, 66]| 3|
| A|[11, 22]| 1|
+----+--------+-------+

最后,计算集群中用户听到的所有歌曲以及随后该集群中用户未听到的所有歌曲,

not_listened = F.udf(lambda song,all_: list(set(all_) - set(song)) , ArrayType(IntegerType()))

grouped_clusters = data.join(clus, data.user==clus.user).select(data['user'],'songs','cluster')\
.groupby('cluster').agg(F.collect_list('songs').alias('all_songs'))\
.join(all_combine, ['cluster']).select('user', all_combine['cluster'], 'songs', 'all_songs')\
.select('user', not_listened(F.col('songs'), F.col('all_songs')).alias('not_listened'))
grouped_clusters.show()

我们得到输出,

+----+------------+                                                             
|user|not_listened|
+----+------------+
| D| [11, 22]|
| A| [33, 44]|
| F| [66]|
| C| [77]|
| E| [99, 11]|
| B| [66, 55]|
+----+------------+

关于python 到 pyspark,转换 pyspark 中的枢轴,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49272209/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com