gpt4 book ai didi

apache-spark - MapWithState 给出 java.lang.ClassCastException : org. apache.spark.util.SerializableConfiguration cannot be cast while recovering from checkpoint

转载 作者:行者123 更新时间:2023-12-03 11:09:34 24 4
gpt4 key购买 nike

我在尝试在 spark 中同时使用 broadcastma​​pWithStatecheckpointing 时遇到 spark streaming 作业的问题。

用法如下:

  • 因为我必须将一些连接对象(不可序列化)传递给执行程序,所以我使用 org.apache.spark.broadcast.Broadcast
  • 因为我们必须维护一些缓存信息,所以我使用带有 mapWithState 的有状态流
  • 我还在使用我的流上下文检查点

我还需要将广播的连接对象传递到 mapWithState 以从外部源获取一些数据。

当新创建上下文时,流程工作正常。但是,当我使应用程序崩溃并尝试从检查点恢复时,我得到了 ClassCastException。

我放了一个基于 example from asyncified.io 的小代码片段在 github 中重现问题:

  • 我的广播逻辑是yuvalitzchakov.utils.KafkaWriter.scala
  • 应用程序的虚拟逻辑是yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast.scala

代码的虚拟片段:

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark-stateful-example")

...
val prop = new Properties()
...

val config: Config = ConfigFactory.parseString(prop.toString)
val sc = new SparkContext(sparkConf)
val ssc = StreamingContext.getOrCreate(checkpointDir, () => {

println("creating context newly")

clearCheckpoint(checkpointDir)

val streamingContext = new StreamingContext(sc, Milliseconds(batchDuration))
streamingContext.checkpoint(checkpointDir)

...
val kafkaWriter = SparkContext.getOrCreate().broadcast(kafkaErrorWriter)
...
val stateSpec = StateSpec.function((key: Int, value: Option[UserEvent], state: State[UserSession]) =>
updateUserEvents(key, value, state, kafkaWriter)).timeout(Minutes(jobConfig.getLong("timeoutInMinutes")))

kafkaTextStream
.transform(rdd => {
offsetsQueue.enqueue(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
rdd
})
.map(deserializeUserEvent)
.filter(_ != UserEvent.empty)
.mapWithState(stateSpec)
.foreachRDD { rdd =>
...
some logic
...

streamingContext
})
}

ssc.start()
ssc.awaitTermination()


def updateUserEvents(key: Int,
value: Option[UserEvent],
state: State[UserSession],
kafkaWriter: Broadcast[KafkaWriter]): Option[UserSession] = {

...
kafkaWriter.value.someMethodCall()
...
}

时出现以下错误

kafkaWriter.value.someMethodCall()

执行:

17/08/01 21:20:38 ERROR Executor: Exception in task 2.0 in stage 3.0 (TID 4)
java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to yuvalitzchakov.utils.KafkaWriter
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserSessions$1(SparkStatefulRunnerWithBroadcast.scala:144)
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$.updateUserEvents(SparkStatefulRunnerWithBroadcast.scala:150)
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:78)
at yuvalitzchakov.stateful.SparkStatefulRunnerWithBroadcast$$anonfun$2.apply(SparkStatefulRunnerWithBroadcast.scala:77)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:181)
at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

基本上 kafkaWriter 是广播变量,kafkaWriter.value 应该返回广播变量,但它返回的是 SerializableCongiguration,它没有被转换到想要的对象

在此先感谢您的帮助!

最佳答案

如果我们需要从 Spark 流中的检查点目录中恢复,则广播变量不能与 MapwithState(一般的转换操作)一起使用。在这种情况下,它只能在输出操作内部使用,因为它需要 Spark 上下文来延迟初始化广播

class JavaWordBlacklist {

private static volatile Broadcast<List<String>> instance = null;

public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordBlacklist.class) {
if (instance == null)

{ List<String> wordBlacklist = Arrays.asList("a", "b", "c"); instance = jsc.broadcast(wordBlacklist); }

}
}
return instance;
}
}

class JavaDroppedWordsCounter {

private static volatile LongAccumulator instance = null;

public static LongAccumulator getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaDroppedWordsCounter.class) {
if (instance == null)

{ instance = jsc.sc().longAccumulator("WordsInBlacklistCounter"); }

}
}
return instance;
}
}

wordCounts.foreachRDD((rdd, time) -> {
// Get or register the blacklist Broadcast
Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use blacklist to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(wordCount -> {
if (blacklist.value().contains(wordCount._1()))

{ droppedWordsCounter.add(wordCount._2()); return false; }

else

{ return true; }

}).collect().toString();
String output = "Counts at time " + time + " " + counts;
}

关于apache-spark - MapWithState 给出 java.lang.ClassCastException : org. apache.spark.util.SerializableConfiguration cannot be cast while recovering from checkpoint,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45443610/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com