gpt4 book ai didi

python - PySpark:在 RDD 中使用对象

转载 作者:太空狗 更新时间:2023-10-29 21:22:00 28 4
gpt4 key购买 nike

我目前正在学习 Python,并希望将其应用到 Spark 上或与 Spark 结合使用。我有这个非常简单(且无用)的脚本:

import sys
from pyspark import SparkContext

class MyClass:
def __init__(self, value):
self.v = str(value)

def addValue(self, value):
self.v += str(value)

def getValue(self):
return self.v

if __name__ == "__main__":
if len(sys.argv) != 1:
print("Usage CC")
exit(-1)

data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
sc = SparkContext(appName="WordCount")
d = sc.parallelize(data)
inClass = d.map(lambda input: (input, MyClass(input)))
reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue))
print(reduzed.collect())

执行时

spark-submit CustomClass.py

..出现以下错误(输出缩短):

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream
for obj in iterator:
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1728, in add_shuffle_key
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
return pickle.dumps(obj, protocol)
PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)...

给我的声明

PicklingError: Can't pickle __main__.MyClass: attribute lookup __main__.MyClass failed

似乎很重要。这意味着类实例不能被序列化,对吧?你知道如何解决这个问题吗?

感谢和问候

最佳答案

有很多问题:

  • 如果将 MyClass 放在一个单独的文件中,它可以被 pickle。这是许多 Python 使用 pickle 的常见问题。这很容易通过移动 MyClass 和使用 from myclass import MyClass 来解决。通常 dill 可以解决这些问题(如 import dill as pickle),但它在这里对我不起作用。
  • 一旦这个问题得到解决,你的 reduce 就不会工作,因为调用 addValue 返回 None(不返回),而不是 MyClass 的实例.您需要更改 addValue 以返回 self
  • 最后,lambda 需要调用 getValue,所以应该有 a.addValue(b.getValue())

一起:myclass.py

class MyClass:
def __init__(self, value):
self.v = str(value)

def addValue(self, value):
self.v += str(value)
return self

def getValue(self):
return self.v

main.py

import sys
from pyspark import SparkContext
from myclass import MyClass

if __name__ == "__main__":
if len(sys.argv) != 1:
print("Usage CC")
exit(-1)

data = [1, 2, 3, 4, 5, 2, 5, 3, 2, 3, 7, 3, 4, 1, 4]
sc = SparkContext(appName="WordCount")
d = sc.parallelize(data)
inClass = d.map(lambda input: (input, MyClass(input)))
reduzed = inClass.reduceByKey(lambda a, b: a.addValue(b.getValue()))
print(reduzed.collect())

关于python - PySpark:在 RDD 中使用对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33639009/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com