gpt4 book ai didi

apache-spark - 为什么 Spark 作业无法写入输出?

转载 作者:行者123 更新时间:2023-12-03 07:07:44 25 4
gpt4 key购买 nike

设置:

我有一个 Spark 作业在具有 10 个节点的分布式 Spark 集群上运行。我正在 HDFS 上进行一些文本文件处理。该作业运行良好,直到最后一步:将输出保存为文本文件。

问题:

我得到以下堆栈跟踪:

15/04/07 11:32:11 INFO spark.SparkContext: Job finished: saveAsTextFile at Main.java:235, took 1.377335791 s
Exception in thread "main" java.io.IOException: Failed to rename RawLocalFileStatus{path=file:/home/ds_myuser/tmp/myapp/out/_temporary/0/task_201504071132_0016_m_000003/part-00003; isDirectory=false; length=2494; replication=1; blocksize=33554432; modification_time=1428427931000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to file:/home/ds_myuser/tmp/myapp/out/part-00003
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
at org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:126)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:995)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:878)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:792)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1162)
at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:440)
at org.apache.spark.api.java.JavaPairRDD.saveAsTextFile(JavaPairRDD.scala:45)
at com.somecompany.analysis.myapp.control.Main.calculateRow(Main.java:235)
at com.somecompany.analysis.myapp.control.Main.calculatemyapp(Main.java:127)
at com.somecompany.analysis.myapp.control.Main.main(Main.java:103)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

此外,这就是我在 Java 代码中保存到文件的方式:

result.saveAsTextFile("/home/myuser/tmp/myapp/out");

此外,有时我会在输出目录中得到 1 个零件文件,有时则没有。

是否因为我试图保存到本地文件系统并且存在竞争条件,因为所有执行程序都试图写入同一位置?但零件文件名不同,所以我想这应该不是问题。

非常感谢任何帮助。

编辑1:

还注意到一件事。奇怪的是,一些临时文件属于“root”,我无法删除:

[myuser@myserver ~]$ rm -rf tmp/myapp/
rm: cannot remove `tmp/myapp/out/_temporary/0/task_201504061658_0016_m_000001/.part-00001.crc': Permission denied
rm: cannot remove `tmp/myapp/out/_temporary/0/task_201504061658_0016_m_000001/part-00001': Permission denied

编辑2:

根据Marius Soutier的建议,我尝试使用coalesce,并且也厌倦了repartition。通过这些更改,作业成功,但在输出目录中我只看到 _SUCCESS 文件,没有part-xxxxx。另外,我正在做

结果.count()

就在coalescerepartition之前,打印260,因此有一些最终输出。但它没有转换为零件文件。

编辑3:

这是我写入文件的代码,它位于驱动程序类中:

    System.out.println("Final No. of Output Lines: " + result.count());
result.coalesce(1, true).saveAsTextFile("file:///home/myuser/tmp3");

以下是打印 count 后的日志:

Final No. of Output Lines: 260
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/04/09 11:30:07 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/04/09 11:30:07 INFO spark.SparkContext: Starting job: saveAsTextFile at Main.java:284
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 164 bytes
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 164 bytes
15/04/09 11:30:07 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 174 bytes
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Registering RDD 23 (coalesce at Main.java:284)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Got job 9 (saveAsTextFile at Main.java:284) with 1 output partitions (allowLocal=false)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Final stage: Stage 21(saveAsTextFile at Main.java:284)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 26)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Missing parents: List(Stage 26)
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Submitting Stage 26 (MapPartitionsRDD[23] at coalesce at Main.java:284), which has no missing parents
15/04/09 11:30:07 INFO storage.MemoryStore: ensureFreeSpace(22392) called with curMem=132730821, maxMem=5556637532
15/04/09 11:30:07 INFO storage.MemoryStore: Block broadcast_17 stored as values in memory (estimated size 21.9 KB, free 5.1 GB)
15/04/09 11:30:07 INFO storage.MemoryStore: ensureFreeSpace(11900) called with curMem=132753213, maxMem=5556637532
15/04/09 11:30:07 INFO storage.MemoryStore: Block broadcast_17_piece0 stored as bytes in memory (estimated size 11.6 KB, free 5.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode111.mydomain.com:34468 (size: 11.6 KB, free: 5.2 GB)
15/04/09 11:30:07 INFO storage.BlockManagerMaster: Updated info of block broadcast_17_piece0
15/04/09 11:30:07 INFO scheduler.DAGScheduler: Submitting 4 missing tasks from Stage 26 (MapPartitionsRDD[23] at coalesce at Main.java:284)
15/04/09 11:30:07 INFO scheduler.TaskSchedulerImpl: Adding task set 26.0 with 4 tasks
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 26.0 (TID 36, mynode117.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 26.0 (TID 37, mynode112.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 26.0 (TID 38, mynode115.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 26.0 (TID 39, mynode119.mydomain.com, PROCESS_LOCAL, 1053 bytes)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode115.mydomain.com:51126 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode117.mydomain.com:33052 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode115.mydomain.com:34724
15/04/09 11:30:07 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode117.mydomain.com:35651
15/04/09 11:30:07 INFO network.ConnectionManager: Accepted connection from [mynode112.mydomain.com/10.211.26.212:52476]
15/04/09 11:30:07 INFO network.SendingConnection: Initiating connection to [mynode112.mydomain.com/10.211.26.212:56453]
15/04/09 11:30:07 INFO network.SendingConnection: Connected to [mynode112.mydomain.com/10.211.26.212:56453], 1 messages pending
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode119.mydomain.com:39126 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO storage.BlockManagerInfo: Added broadcast_17_piece0 in memory on mynode112.mydomain.com:56453 (size: 11.6 KB, free: 2.1 GB)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 26.0 (TID 36) in 356 ms on mynode117.mydomain.com (1/4)
15/04/09 11:30:07 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 26.0 (TID 38) in 362 ms on mynode115.mydomain.com (2/4)
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode119.mydomain.com:42604
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sparkExecutor@mynode112.mydomain.com:46239
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 26.0 (TID 37) in 796 ms on mynode112.mydomain.com (3/4)
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 26.0 (TID 39) in 829 ms on mynode119.mydomain.com (4/4)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Stage 26 (coalesce at Main.java:284) finished in 0.835 s
15/04/09 11:30:08 INFO scheduler.DAGScheduler: looking for newly runnable stages
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 26.0, whose tasks have all completed, from pool
15/04/09 11:30:08 INFO scheduler.DAGScheduler: running: Set()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: waiting: Set(Stage 21)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: failed: Set()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Missing parents for Stage 21: List()
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Submitting Stage 21 (MappedRDD[27] at saveAsTextFile at Main.java:284), which is now runnable
15/04/09 11:30:08 INFO storage.MemoryStore: ensureFreeSpace(53664) called with curMem=132765113, maxMem=5556637532
15/04/09 11:30:08 INFO storage.MemoryStore: Block broadcast_18 stored as values in memory (estimated size 52.4 KB, free 5.1 GB)
15/04/09 11:30:08 INFO storage.MemoryStore: ensureFreeSpace(19192) called with curMem=132818777, maxMem=5556637532
15/04/09 11:30:08 INFO storage.MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 18.7 KB, free 5.1 GB)
15/04/09 11:30:08 INFO storage.BlockManagerInfo: Added broadcast_18_piece0 in memory on mynode111.mydomain.com:34468 (size: 18.7 KB, free: 5.2 GB)
15/04/09 11:30:08 INFO storage.BlockManagerMaster: Updated info of block broadcast_18_piece0
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 21 (MappedRDD[27] at saveAsTextFile at Main.java:284)
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Adding task set 21.0 with 1 tasks
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 21.0 (TID 40, mynode112.mydomain.com, ANY, 1353 bytes)
15/04/09 11:30:08 INFO storage.BlockManagerInfo: Added broadcast_18_piece0 in memory on mynode112.mydomain.com:56453 (size: 18.7 KB, free: 2.1 GB)
15/04/09 11:30:08 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 4 to sparkExecutor@mynode112.mydomain.com:46239
15/04/09 11:30:08 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 4 is 199 bytes
15/04/09 11:30:08 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 21.0 (TID 40) in 441 ms on mynode112.mydomain.com (1/1)
15/04/09 11:30:08 INFO scheduler.DAGScheduler: Stage 21 (saveAsTextFile at Main.java:284) finished in 0.447 s
15/04/09 11:30:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 21.0, whose tasks have all completed, from pool
15/04/09 11:30:08 INFO spark.SparkContext: Job finished: saveAsTextFile at Main.java:284, took 1.381897276 s
[myuser@mynode111 ~]$ ls tmp3/
_SUCCESS
[myuser@mynode111 ~]$

附注,我正在 HDFS 上进行处理,并期望在启动应用程序的本地 FS 上的 saveAsTextFile() 之后得到最终输出文件。我希望 Spark 没有将其写在其他地方(某个节点的本地 FS)。

快速更新:

我尝试写入 HDFS,而不是本地 FS,它工作正常:

result.coalesce(1, true).saveAsTextFile("hdfs://mynode20.mydomain.com:8020/user/myuser/tmp");

输出:

[myuser@mynode111 ~]$ hadoop fs -ls /user/myuser/tmp
Found 2 items
-rw-r--r-- 3 myuser myuser 0 2015-04-09 11:53 /user/myuser/tmp/_SUCCESS
-rw-r--r-- 3 myuser myuser 12470 2015-04-09 11:53 /user/myuser/tmp/part-00000
[myuser@mynode111 ~]$

最佳答案

我遇到了同样的问题,事实证明我的 Spark 工作线程以 root 用户 身份运行,而我的作业以 另一个用户 身份运行,因此在调用 saveAsTextFile 时,Spark 工作线程首先以 root 用户身份将数据保存到磁盘上的临时位置,然后以不同用户身份运行的 Spark 作业尝试移动root 拥有的临时数据到最终位置时,会存在权限问题。

关于apache-spark - 为什么 Spark 作业无法写入输出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29497887/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com