gpt4 book ai didi

apache-spark - 使 Python 字典可用于所有 spark 分区

转载 作者:行者123 更新时间:2023-12-05 01:00:07 24 4
gpt4 key购买 nike

我正在尝试在 pyspark 中开发一种算法,我正在使用 linalg.SparseVector 类。我需要创建一个键值对字典作为每个 SparseVector 对象的输入。这里的键必须是整数,因为它们代表整数(在我的例子中代表用户 ID)。我有一个单独的方法来读取输入文件并返回一个字典,其中每个用户 ID(字符串)都映射到一个整数索引。当我再次浏览文件并执行

FileRdd.map( lambda x: userid_idx[ x[0] ] ) 。我收到一个 KeyError。我认为这是因为我的 dict 对所有分区都不可用。有没有办法让 userid_idx dict 可用于类似于 MapReduce 中的分布式映射的所有分区?我也为这次的困惑道歉。我正在使用我的手机发布此信息。将在一段时间内从我的笔记本电脑更新。

promise 的代码:

from pyspark.mllib.linalg import SparseVector
from pyspark import SparkContext
import glob
import sys
import time
"""We create user and item indices starting from 0 to #users and 0 to #items respectively. This is done to store them in sparseVectors as dicts."""
def create_indices(inputdir):
items=dict()
user_id_to_idx=dict()
user_idx_to_id=dict()
item_idx_to_id=dict()
item_id_to_idx=dict()
item_idx=0
user_idx=0
for inputfile in glob.glob(inputdir+"/*.txt"):
print inputfile
with open(inputfile) as f:
for line in f:
toks=line.strip().split("\t")
try:
user_id_to_idx[toks[1].strip()]
except KeyError:
user_id_to_idx[toks[1].strip()]=user_idx
user_idx_to_id[user_idx]=toks[1].strip()
user_idx+=1
try:
item_id_to_idx[toks[0].strip()]
except KeyError:
item_id_to_idx[toks[0].strip()]=item_idx
item_idx_to_id[item_idx]=toks[0].strip()
item_idx+=1
return user_idx_to_id,user_id_to_idx,item_idx_to_id,item_id_to_idx,user_idx,item_idx

# pass in the hdfs path to the input files and the spark context.
def runKNN(inputdir,sc,user_id_to_idx,item_id_to_idx):
rdd_text=sc.textFile(inputdir)
try:

new_rdd = rdd_text.map(lambda x: (item_id_to_idx[str(x.strip().split("\t")[0])],{user_id_to_idx[str(x.strip().split("\t")[1])]:1})).reduceByKey(lambda x,y: x.update(y))
except KeyError:
sys.exit(1)
new_rdd.saveAsTextFile("hdfs:path_to_output/user/hadoop/knn/output")

if __name__=="__main__":
sc = SparkContext()
u_idx_to_id,u_id_to_idx,i_idx_to_id,i_id_to_idx,u_idx,i_idx=create_indices(sys.argv[1])
u_idx_to_id_b=sc.broadcast(u_idx_to_id)
u_id_to_idx_b=sc.broadcast(u_id_to_idx)
i_idx_to_idx_b=sc.broadcast(i_idx_to_id)
i_id_to_idx_b=sc.broadcast(i_id_to_idx)
num_users=sc.broadcast(u_idx)
num_items=sc.broadcast(i_idx)
runKNN(sys.argv[1],sc,u_id_to_idx_b.value,i_id_to_idx_b.value)

最佳答案

在 Spark 中,该字典已经可供您使用,就像在所有任务中一样。例如:

dictionary = {1:"red", 2:"blue"}
rdd = sc.parallelize([1,2])
rdd.map(lambda x: dictionary[x]).collect()
# Prints ['red', 'blue']

您可能会发现您的问题实际上是您的字典不包含您要查找的键!

来自 Spark documentation :

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program.



引用的局部变量的副本将与任务一起发送到节点。

Broadcast variables在这里对您没有帮助,它们只是通过每个节点发送一次而不是每个任务一次来提高性能的工具。

关于apache-spark - 使 Python 字典可用于所有 spark 分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30125594/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com