gpt4 book ai didi

python - Pyspark - 在 map 转换中使用自定义函数

转载 作者:太空宇宙 更新时间:2023-11-03 16:30:04 24 4
gpt4 key购买 nike

我使用 py.test 运行以下文件(名为 test_func.py):

import findspark
findspark.init()
from pyspark.context import SparkContext

def filtering(data):
return data.map(lambda p: modif(p)).count()

def modif(row):
row.split(",")

class Test(object):
sc = SparkContext('local[1]')

def test_filtering(self):
data = self.sc.parallelize(['1','2', ''])
assert filtering(data) == 2

并且,由于 modif 函数在 map 转换内部使用,因此会失败并出现以下错误:

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/home/osboxes/spark-1.5.2-bin-hadoop2.4/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
ImportError: No module named clustering.test_func

pyspark 无法找到 modif 函数。请注意,文件 test_func.py 位于目录 clustering 中,我从 clustering 目录中运行 py.test .

令我惊讶的是,如果我在 map 之外使用 modif 函数,它工作得很好。例如,如果我这样做: modif(data.first())

知道为什么我会遇到这样的导入错误以及如何修复它吗?

<小时/>

编辑

  1. 我已经测试了 Avihoo Mamka 的答案,即将 test_func.py 添加到复制到所有工作人员的文件中。然而,并没有什么效果。这对我来说并不奇怪,因为我认为创建 Spark 应用程序的主文件总是发送给所有工作人员。
  2. 我认为这可能来自于 pyspark 正在寻找 clustering.test_func 而不是 test_func

最佳答案

这里的关键是你得到的Traceback

PySpark 告诉您工作进程无权访问 clustering.test_func.py。当您初始化 SparkContext 时,您可以传递应复制到工作线程的文件列表:

sc = SparkContext("local[1]", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])

更多信息:https://spark.apache.org/docs/1.5.2/programming-guide.html

关于python - Pyspark - 在 map 转换中使用自定义函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37673462/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com