gpt4 book ai didi

apache-spark - 执行程序失败后,Spark 无法在 HDFS 中找到检查点数据

转载 作者:行者123 更新时间:2023-12-04 15:55:06 26 4
gpt4 key购买 nike

我正在从 Kafka 发送数据,如下所示:

final JavaPairDStream<String, Row> transformedMessages = 


rtStream
.mapToPair(record -> new Tuple2<String, GenericDataModel>(record.key(), record.value()))
.mapWithState(StateSpec.function(updateDataFunc).numPartitions(32)).stateSnapshots()
.foreachRDD(rdd -> {
--logic goes here
});

这个应用程序有四个工作线程和多个执行程序,我正在尝试检查 Spark 的容错能力。

由于我们使用的是 mapWithState,spark 正在将数据检查点保存到 HDFS,因此如果任何执行程序/工作程序发生故障,我们应该能够恢复丢失的数据(死执行程序中丢失的数据),并继续使用剩余的执行程序/工作程序.

所以我杀死了一个工作节点以查看应用程序是否仍然顺利运行,但是我在 HDFS 中得到了一个 FileNotFound 异常,如下所示:

这有点奇怪,因为 Spark 在某个时候在 HDFS 中检查了数据,为什么它找不到它。显然 HDFS 没有删除任何数据,所以为什么会出现这个异常。

还是我在这里遗漏了什么?

[ERROR] 2018-08-21 13:07:24,067 org.apache.spark.streaming.scheduler.JobScheduler logError - Error running job streaming job 1534871220000 ms.2
org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000
java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
at org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:273)
at scala.Option.map(Option.scala:146)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:273)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1615)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1626)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1625)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1625)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1625)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1623)

进一步更新:我发现 Spark 试图在 HDFS 中查找的 RDD 已被“ReliableRDDCheckpointData”进程删除,并为检查点数据创建了一个新的 RDD。DAG 以某种方式指向这个旧的 RDD。如果有任何对这些数据的引用,它不应该被删除。

最佳答案

考虑 Spark 流上的这个转换管道:

rtStream
.mapToPair(record -> new Tuple2<String, GenericDataModel>(record.key(), record.value()))
.mapWithState(StateSpec.function(updateDataFunc).numPartitions(32)).stateSnapshots()
.foreachRDD(rdd -> {
if(counter ==1){
--convert RDD to Dataset, and register it as a SQL table names "InitialDataTable"
} else
--convert RDD to Dataset, and register it as a SQL table names "ActualDataTable"


});

mapWithState关联的是每batch之后状态数据的自动checkpoint,所以上面的“forEachRdd”block中的每个“rdd”都会被checkpoint,并且在checkpoint的同时,会覆盖之前的checkpoint(因为很明显最新的state需要留在检查站)

但是假设用户仍在使用 rdd 编号 1,就像在我的情况下,我将第一个 rdd 注册为不同的表,并将所有其他 rdd 注册为不同的表,那么它不应该被覆盖。 (在 Java 中也是如此,如果某物引用了对象引用,则该对象将不符合垃圾回收条件)

现在,当我尝试访问“InitialDataTable”表时,显然用于创建此表的“rdd”已不在内存中,因此它将转到 HDFS 从检查点恢复它,但它不会找到它也在那里,因为它被下一个 rdd 覆盖,并且 spark 应用程序停止引用原因。

“org.apache.spark.SparkException:作业因阶段失败而中止:任务创建失败:java.io.FileNotFoundException:文件不存在:hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954- 41a7-9b9d-4ec874bc86de/rdd-1005/part-00000"

因此,为了解决这个问题,我必须明确检查第一个 rdd。

关于apache-spark - 执行程序失败后,Spark 无法在 HDFS 中找到检查点数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52059382/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com