gpt4 book ai didi

apache-spark - 当我并行化一个大列表时 Spark 上下文关闭

转载 作者:行者123 更新时间:2023-12-04 05:14:34 26 4
gpt4 key购买 nike

当我在 Spark 中从列表创建 RDD 时,它经常会导致 Spark 上下文在我尝试对其执行 RDD 操作时立即关闭。

这是导致崩溃的代码,下面是堆栈跟踪。非常感谢任何指导!

import sys

import numpy as np
import pyspark

SC = pyspark.SparkContext("local", "Crash app")

for i in xrange(10):

randArray = np.random.rand(10**i)

randRdd = SC.parallelize(randArray)
print "Size of the RDD is ", randRdd.count()
sys.stdout.flush()

生成此堆栈跟踪:

Size of the RDD is 1
Size of the RDD is 10
Size of the RDD is 100
Size of the RDD is 1000
Size of the RDD is 10000
Size of the RDD is 100000
Size of the RDD is 1000000
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-3-7e69d839c2b5> in <module>()
4
5 randRdd = SC.parallelize(randArray)
----> 6 print "Size of the RDD is " + str(randRdd.count())
7 sys.stdout.flush()

/usr/local/spark/python/pyspark/rdd.pyc in count(self)
706 3
707 """
--> 708 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
709
710 def stats(self):

/usr/local/spark/python/pyspark/rdd.pyc in sum(self)
697 6.0
698 """
--> 699 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
700
701 def count(self):

/usr/local/spark/python/pyspark/rdd.pyc in reduce(self, f)
617 if acc is not None:
618 yield acc
--> 619 vals = self.mapPartitions(func).collect()
620 return reduce(f, vals)
621

/usr/local/spark/python/pyspark/rdd.pyc in collect(self)
581 """
582 with _JavaStackTrace(self.context) as st:
--> 583 bytesInJava = self._jrdd.collect().iterator()
584 return list(self._collect_iterator_through_file(bytesInJava))
585

/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
535 answer = self.gateway_client.send_command(command)
536 return_value = get_return_value(answer, self.gateway_client,
--> 537 self.target_id, self.name)
538
539 for temp_arg in temp_args:

/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling o103.collect.
: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:638)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1215)
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
at akka.actor.ActorCell.terminate(ActorCell.scala:338)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

最佳答案

范围 10000000,可能超出了内存限制...默认情况下,Spark 使用 parallelize fn 创建 4 个分区,可能仍然无法满足允许的内存限制。但是,如果我们将 say 之类的分区增加到 10(通过将其作为 parallelize fn 的参数提供),它可能会进入允许的范围并在没有任何错误的情况下执行。这是分布式编程的另一个优势之一。:) 希望我正确地解释了它。如果不是这样,请检查并更正。

关于apache-spark - 当我并行化一个大列表时 Spark 上下文关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29824139/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com