gpt4 book ai didi

r - 在 SparkR 中使用 CreateDataFrame 时出现 "Job aborted due to stage failure"

转载 作者:行者123 更新时间:2023-12-02 03:03:42 31 4
gpt4 key购买 nike

按照这篇文章 (https://spark.apache.org/docs/latest/sparkr.html#from-local-data-frames) 中的说明,我正在使用以下代码创建一个 sparkdataframe:

library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "spark://master:7077", sparkConfig = list(spark.cores.max="8", spark.executor.cores = "4"))
data(iris)
iris = createDataFrame(iris)
head(iris)

但是 head 函数总是导致下面的错误。当我尝试运行 dim 时,我也遇到了同样的错误。我还尝试了 as.DataFrame 而不是 createDataFrame。我还尝试在我的 ipython 笔记本中重新启动内核并重新启动我的 spark session 。

我的理解是,这是 SparkR 的一个非常基本的功能,所以我真的不知道为什么它不起作用。出于某种原因,当我使用 read.jdbc 直接从数据源读取我的 SparkDataFrame 时,我没有遇到任何问题。另外,我注意到错误行中的数字“:阶段 XXX 中的任务 0..”每次失败时都会递增 1。

我还注意到错误似乎是由于执行程序无法找到 Rscript 的二进制文件这一事实引起的,尽管我不确定为什么这只会发生在 SparkDataFrames 上是从本地 data.frames 创建的,而不是从从外部数据源提取的数据帧创建的。

有人可以帮我解决这个问题吗?

完整的错误堆栈跟踪是:

Warning message in FUN(X[[i]], ...): “Use Sepal_Length instead of Sepal.Length as column name”Warning message in FUN(X[[i]], ...): “Use Sepal_Width instead of Sepal.Width as column name”Warning message in FUN(X[[i]], ...): “Use Petal_Length instead of Petal.Length as column name”Warning message in FUN(X[[i]], ...): “Use Petal_Width instead of Petal.Width as column name”

Error in invokeJava(isStatic = TRUE, className, methodName, ...): org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 45.0 failed 4 times, most recent failure: Lost task 0.3 in stage 45.0 (TID 3372, 10.0.0.5): java.io.IOException: Cannot run program "Rscript": error=2, No such file or directory at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) at org.apache.spark.api.r.RRunner$.createRProcess(RRunner.scala:348) at org.apache.spark.api.r.RRunner$.createRWorker(RRunner.scala:364) at org.apache.spark.api.r.RRunner.compute(RRunner.scala:69) at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:49) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: error=2, No such file or directory at java.lang.UNIXProcess.forkAndExec(Native Method) at java.lang.UNIXProcess.(UNIXProcess.java:247) at java.lang.ProcessImpl.start(ProcessImpl.java:134) at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029) ... 24 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2182) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2187) at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2187) at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2545) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2187) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2163) at org.apache.spark.sql.api.r.SQLUtils$.dfToCols(SQLUtils.scala:208) at org.apache.spark.sql.api.r.SQLUtils.dfToCols(SQLUtils.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:141) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:86) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Cannot run program "Rscript": error=2, No such file or directory at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) at org.apache.spark.api.r.RRunner$.createRProcess(RRunner.scala:348) at org.apache.spark.api.r.RRunner$.createRWorker(RRunner.scala:364) at org.apache.spark.api.r.RRunner.compute(RRunner.scala:69) at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:49) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark Traceback:

  1. head(charEx)
  2. head(charEx)
  3. .local(x, ...)
  4. take(x, num)
  5. take(x, num)
  6. collect(limited)
  7. collect(limited)
  8. .local(x, ...)
  9. callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", . x@sdf)
  10. invokeJava(isStatic = TRUE, className, methodName, ...)
  11. stop(readString(conn))

最佳答案

我是这样理解的:

read.jdbc 起作用的原因是,在节点上,不需要 R 来执行操作:驱动程序(其中 R 运行)将命令转换为 Spark,然后再复制到工作节点并在工作节点上执行。

createDataFrame 失败的原因是它作为 R 命令被复制到工作节点,因此节点需要访问 Rscript.

如果你想使用 createDataFrame 来稍微处理你的数据,我建议你使用本地 Sparksession(否则你必须复制 Rscript 到您的工作节点)。如果您需要通过 R 传递数据以首先将其放入 Spark,您可能需要重新考虑(您可能正在使用 Spark 因为你有大量的数据,通常最好将所有内容加载并保存在 Spark 端,例如,将聚合 block 拉到内存中的 R ).

关于r - 在 SparkR 中使用 CreateDataFrame 时出现 "Job aborted due to stage failure",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44125809/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com