gpt4 book ai didi

python - 在 self 实现的对象/类的功能上使用 Pysparks rdd.parallelize().map()

转载 作者:行者123 更新时间:2023-12-04 07:47:45 25 4
gpt4 key购买 nike

我有一些我想并行计算的对象,因此我认为我可以求助于 pyspark。
考虑这个例子,一个类,它的对象确实有一个数字 i ,可以与 square() 平方:

class MyMathObject():
def __init__(self, i):
self.i = i
def square(self):
return self.i ** 2


print(MyMathObject(3).square()) # Test one instance with regular python - works
此外,我设置了 pyspark(在 jupyter 笔记本中),现在我想在我的对象上并行计算 0 到 4 的平方:
import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext("local[2]")

rdd = sc.parallelize([MyMathObject(i) for i in range(5)])
rdd.map(lambda obj: obj.square()).collect() # This fails
这不起作用 - 它会导致很长的错误消息,对我来说大多是无用的。
我觉得有点有趣的唯一一行是:
AttributeError:无法从 '/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark 获取
因此,这似乎与属性 square() 的调用方式有关。我在最后复制了完整的消息。
Pyspark 本身似乎可以工作。例如,在普通 python 列表上执行以下操作会按预期返回平方数。
rdd = sc.parallelize([i for i in range(5)])
rdd.map(lambda i: i**2).collect()
因此,我创建或操作对象的方式似乎存在缺陷,但我无法追查错误。
完整的错误信息:

Py4JJavaError Traceback (most recent call last) in 1 rdd = sc.parallelize([MyMathObject(i) for i in range(5)])----> 2 rdd.map(lambda obj: obj.square()).collect()

/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/pyspark/rdd.py in collect(self)887 """888 with SCCallSiteSync(self.context) as css:--> 889 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())890 return list(_load_from_socket(sock_info, self._jrdd_deserializer))891

/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in call(self, *args)13021303 answer = self.gateway_client.send_command(command)-> 1304 return_value = get_return_value(1305 answer, self.gateway_client, self.target_id, self.name)1306

/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)325 if answer[1] == REFERENCE_TYPE:--> 326 raise Py4JJavaError(327 "An error occurred while calling {0}{1}{2}.\n".328 format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, 192.168.2.108, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 605, in mainprocess()File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 597, in processserializer.dump_stream(out_iter, outfile)File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_streamvs = list(itertools.islice(iterator, batch))File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 147, in load_streamyield self._read_with_length(stream)File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_lengthreturn self.loads(obj)File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loadsreturn pickle.loads(obj, encoding=encoding)AttributeError: Can't get attribute 'MyMathObject' on <module 'pyspark.daemon' from '/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)at scala.collection.Iterator.foreach(Iterator.scala:941)at scala.collection.Iterator.foreach$(Iterator.scala:941)at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)at org.apache.spark.scheduler.Task.run(Task.scala:127)at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)at scala.Option.foreach(Option.scala:407)at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2179)at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.base/java.lang.reflect.Method.invoke(Method.java:566)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:282)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:238)at java.base/java.lang.Thread.run(Thread.java:834)Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 605, in mainprocess()File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 597, in processserializer.dump_stream(out_iter, outfile)File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_streamvs = list(itertools.islice(iterator, batch))File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 147, in load_streamyield self._read_with_length(stream)File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_lengthreturn self.loads(obj)File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loadsreturn pickle.loads(obj, encoding=encoding)AttributeError: Can't get attribute 'MyMathObject' on <module 'pyspark.daemon' from '/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)at scala.collection.Iterator.foreach(Iterator.scala:941)at scala.collection.Iterator.foreach$(Iterator.scala:941)at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)at org.apache.spark.scheduler.Task.run(Task.scala:127)at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)... 1 more

最佳答案

它看起来不像您的错误,但解决此问题的一种方法是将模块放在单独的 Python 文件中并导入它:
例如打开文件mymodule.py

class MyMathObject():
def __init__(self, i):
self.i = i
def square(self):
return self.i ** 2
在你的主脚本中,你可以做
from mymodule import MyMathObject

rdd = sc.parallelize([MyMathObject(i) for i in range(5)])
rdd.map(lambda obj: obj.square()).collect()
这应该给出 [0, 1, 4, 9, 16]。

关于python - 在 self 实现的对象/类的功能上使用 Pysparks rdd.parallelize().map(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67140209/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com