gpt4 book ai didi

python - Spark 蟒 : How to calculate Jaccard Similarity between each line within an RDD?

转载 作者:太空宇宙 更新时间:2023-11-04 04:29:33 25 4
gpt4 key购买 nike

我有一个包含大约 5 万行和 2 列的表格。你可以认为每一行是一部电影,列是该电影的属性 - “ID”:该电影的 ID,“标签”:电影的一些内容标签,以字符串列表的形式每部电影

数据看起来像这样:

movie_1, ['浪漫','喜剧','英语'];movie_2, ['action','kongfu','Chinese']

我的目标是首先根据每个电影对应的标签计算提花相似度,一旦完成,我就能知道每个电影(例如我选择 movie_1),其他的是什么与这部电影最相似的前 5 部电影(在本例中为 movie_1)。我不仅想要 movie_1 本身的前 5 个结果,而且要获得所有电影的前 5 个结果。

我曾尝试使用 Python 来解决这个问题,但是运行时是一个很大的挑战。即使我使用多处理,在 6 个内核上运行,总运行时间仍然持续 20 多个小时。

Python 代码如下:

import pandas as pd
from collections import Counter
import numpy as np
from multiprocessing import Pool
import time

col_names=['movie_id','tag_name']
df=pd.read_csv("movies.csv",names=col_names)
movie_ids=df['movie_id'].tolist()
tag_list=df['tag_name'].tolist()

def jaccard_similarity(string1, string2):
intersection = set(string1).intersection(set(string2))
union = set(string1).union(set(string2))
return len(intersection)/float(len(union))

def jc_results(movie_id):
result=Counter()
this_index=movie_ids.index(movie_id)
for another_id in movie_ids:
that_index=movie_ids.index(another_id)
if another_id==movie_id:
continue
else:
tag_1=tag_list[this_index]
tag_2=tag_list[that_index]
jaccard = jaccard_similarity(tag_1,tag_2)
result[(movie_id,another_id)]=jaccard
return result.most_common(10)


from multiprocessing import Pool
pool=Pool(6)
results={}
for movie_id in movie_ids:
results[movie_id]=pool.apply_async(jc_results,args=(movie_id,))
pool.close()
pool.join()
for movie_id, res in results.items():
results[movie_id] = res.get()

然后我想切换到 Pyspark,但是我对 spark python 还是很陌生,用它写了几行后就卡住了,实际上我唯一取得的进步是使用 sc.textFile 将数据读入 RDD ...已阅读现有帖子,但他们都在使用 Scala。如果有人可以帮助或提供 Pyspark 的任何指导,那就太好了。非常感谢!

最佳答案

您可以尝试类似于 this stackoverflow answer 的解决方案,但由于您的数据已经标记化(字符串列表),因此您不需要执行该步骤或 ngram 步骤。

我还会提到 pyspark 中的 approxSimilarityJoin 计算 Jaccard 距离而不是 Jaccard 相似度,但如果您特别需要,您可以从 1 中减去以转换回相似度。

您的代码最终看起来类似于:

from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, MinHashLSH
import pyspark.sql.functions as f

db = spark.createDataFrame([
('movie_1', ['romantic','comedy','English']),
('movie_2', ['action','kongfu','Chinese']),
('movie_3', ['romantic', 'action'])
], ['movie_id', 'genres'])


model = Pipeline(stages=[
HashingTF(inputCol="genres", outputCol="vectors"),
MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=10)
]).fit(db)

db_hashed = model.transform(db)

db_matches = model.stages[-1].approxSimilarityJoin(db_hashed, db_hashed, 0.9)

#show all matches (including duplicates)
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
f.col('datasetB.movie_id').alias('movie_id_B'),
f.col('distCol')).show()

#show non-duplicate matches
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
f.col('datasetB.movie_id').alias('movie_id_B'),
f.col('distCol')).filter('movie_id_A < movie_id_B').show()

对应的输出:

+----------+----------+-------+
|movie_id_A|movie_id_B|distCol|
+----------+----------+-------+
| movie_3| movie_3| 0.0|
| movie_1| movie_3| 0.75|
| movie_2| movie_3| 0.75|
| movie_1| movie_1| 0.0|
| movie_2| movie_2| 0.0|
| movie_3| movie_2| 0.75|
| movie_3| movie_1| 0.75|
+----------+----------+-------+

+----------+----------+-------+
|movie_id_A|movie_id_B|distCol|
+----------+----------+-------+
| movie_1| movie_3| 0.75|
| movie_2| movie_3| 0.75|
+----------+----------+-------+

关于python - Spark 蟒 : How to calculate Jaccard Similarity between each line within an RDD?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52923110/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com