- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一些我想并行计算的对象,因此我认为我可以求助于 pyspark。
考虑这个例子,一个类,它的对象确实有一个数字 i
,可以与 square()
平方:
class MyMathObject():
def __init__(self, i):
self.i = i
def square(self):
return self.i ** 2
print(MyMathObject(3).square()) # Test one instance with regular python - works
此外,我设置了 pyspark(在 jupyter 笔记本中),现在我想在我的对象上并行计算 0 到 4 的平方:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext("local[2]")
rdd = sc.parallelize([MyMathObject(i) for i in range(5)])
rdd.map(lambda obj: obj.square()).collect() # This fails
这不起作用 - 它会导致很长的错误消息,对我来说大多是无用的。
square()
的调用方式有关。我在最后复制了完整的消息。
rdd = sc.parallelize([i for i in range(5)])
rdd.map(lambda i: i**2).collect()
因此,我创建或操作对象的方式似乎存在缺陷,但我无法追查错误。
Py4JJavaError Traceback (most recent call last) in 1 rdd = sc.parallelize([MyMathObject(i) for i in range(5)])----> 2 rdd.map(lambda obj: obj.square()).collect()
/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/pyspark/rdd.py in collect(self)887 """888 with SCCallSiteSync(self.context) as css:--> 889 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())890 return list(_load_from_socket(sock_info, self._jrdd_deserializer))891
/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in call(self, *args)13021303 answer = self.gateway_client.send_command(command)-> 1304 return_value = get_return_value(1305 answer, self.gateway_client, self.target_id, self.name)1306
/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)325 if answer[1] == REFERENCE_TYPE:--> 326 raise Py4JJavaError(327 "An error occurred while calling {0}{1}{2}.\n".328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, 192.168.2.108, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 605, in mainprocess()File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 597, in processserializer.dump_stream(out_iter, outfile)File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_streamvs = list(itertools.islice(iterator, batch))File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 147, in load_streamyield self._read_with_length(stream)File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_lengthreturn self.loads(obj)File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loadsreturn pickle.loads(obj, encoding=encoding)AttributeError: Can't get attribute 'MyMathObject' on <module 'pyspark.daemon' from '/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)at scala.collection.Iterator.foreach(Iterator.scala:941)at scala.collection.Iterator.foreach$(Iterator.scala:941)at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)at org.apache.spark.scheduler.Task.run(Task.scala:127)at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)at java.base/java.lang.Thread.run(Thread.java:834)
Driver stacktrace:at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)at scala.Option.foreach(Option.scala:407)at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)at org.apache.spark.SparkContext.runJob(SparkContext.scala:2179)at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.base/java.lang.reflect.Method.invoke(Method.java:566)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:282)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:238)at java.base/java.lang.Thread.run(Thread.java:834)Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 605, in mainprocess()File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 597, in processserializer.dump_stream(out_iter, outfile)File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_streamvs = list(itertools.islice(iterator, batch))File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 147, in load_streamyield self._read_with_length(stream)File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_lengthreturn self.loads(obj)File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loadsreturn pickle.loads(obj, encoding=encoding)AttributeError: Can't get attribute 'MyMathObject' on <module 'pyspark.daemon' from '/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)at scala.collection.Iterator.foreach(Iterator.scala:941)at scala.collection.Iterator.foreach$(Iterator.scala:941)at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154)at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)at org.apache.spark.scheduler.Task.run(Task.scala:127)at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)... 1 more
最佳答案
它看起来不像您的错误,但解决此问题的一种方法是将模块放在单独的 Python 文件中并导入它:
例如打开文件mymodule.py
和
class MyMathObject():
def __init__(self, i):
self.i = i
def square(self):
return self.i ** 2
在你的主脚本中,你可以做
from mymodule import MyMathObject
rdd = sc.parallelize([MyMathObject(i) for i in range(5)])
rdd.map(lambda obj: obj.square()).collect()
这应该给出 [0, 1, 4, 9, 16]。
关于python - 在 self 实现的对象/类的功能上使用 Pysparks rdd.parallelize().map(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67140209/
我是 Pyspark 新手,我使用的是 Spark 2.0.2。 我有一个名为 Test_RDD 的 RDD,其结构如下: U-Key || V1 || V2 || V3 || ----
我正在寻找一种方法将一个 RDD 拆分为两个或多个 RDD,并将获得的结果保存为两个单独的 RDD。例如: rdd_test = sc.parallelize(range(50), 1) 我的代码:
我有一个结构如下的RDD: ((user_id,item_id,rating)) 让我们将此 RDD 称为训练 然后还有另一个具有相同结构的rdd: ((user_id,item_id,rating)
已经有人问过类似的问题。最相似的是这个: Spark: How to split an RDD[T]` into Seq[RDD[T]] and preserve the ordering 但是,我不
我正在使用 spark 来处理数据。但是我不知道如何将新数据保存到Hive 我从 Hive 加载 rdd,然后运行 map 函数来清理数据。 result = myRdd.map(lambda x
我有一个名为 index 的 rdd:RDD[(String, String)],我想用 index 来处理我的文件。 这是代码: val get = file.map({x => val tmp
我有两个 RDD: **rdd1** id1 val1 id2 val2 **rdd2** id1 v1 id2 v2 id1 v3 id8 v7 id1 v4 id3 v5 id6 v6 我想过滤
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD 对: (105,918) (105,757) (502,516) (105,137) (516,816) (350,502) 我想将它分成两个 RDD,这样第一个只有具有非重复值的对
我正在尝试使用 spark 执行 K 最近邻搜索。 我有一个 RDD[Seq[Double]] 并且我打算返回一个 RDD[(Seq[Double],Seq[Seq[Double]])] 带有实际行和
我是Spark和Scala语言的新手,并且希望将所有RDD合并到一个List中,如下所示(List to RDD): val data = for (item {
我找不到只参与 rdd 的方法. take看起来很有希望,但它返回 list而不是 rdd .我当然可以将其转换为 rdd ,但这似乎既浪费又丑陋。 my_rdd = sc.textFile("my
我正在寻找一种将 RDD 拆分为两个或更多 RDD 的方法。我见过的最接近的是 Scala Spark: Split collection into several RDD?这仍然是一个单一的 RDD
我有一个RDD[String],wordRDD。我还有一个从字符串/单词创建 RDD[String] 的函数。我想为 wordRDD 中的每个字符串创建一个新的 RDD。以下是我的尝试: 1) 失败,
我刚刚开始使用 Spark 和 Scala 我有一个包含多个文件的目录我使用 成功加载它们 sc.wholeTextFiles(directory) 现在我想升一级。我实际上有一个目录,其中包含包含文
我想从另一个 RDD 中减去一个 RDD。我查看了文档,发现 subtract可以这样做。实际上,当我测试时 subtract , 最终的 RDD 保持不变,值不会被删除! 有没有其他功能可以做到这一
我在 HDFS 中有如下三个文件中的数据 EmployeeManagers.txt (EmpID,ManagerID) 1,5 2,4 3,4 4,6 5,6 EmployeeNames.txt (E
我正在开发一个应用程序,我需要对 RDD 中具有相同键的每对行执行计算,这是 RDD 结构: List>> dat2 = new ArrayList<>(); dat2.add(new Tuple2>
我在 spark 集群中有两个文件,foo.csv 和 bar.csv,它们都有 4 列和完全相同的字段:时间、用户、url、类别。 我想通过 bar.csv 的某些列过滤掉 foo.csv。最后,我
我是一名优秀的程序员,十分优秀!