gpt4 book ai didi

java - 使用 Spark 驱动程序中的 Java native readObject 进行反序列化时出现 ClassCastException

转载 作者:太空宇宙 更新时间:2023-11-04 10:42:54 24 4
gpt4 key购买 nike

我有两个 Spark 作业 A 和 B,因此 A 必须在 B 之前运行。A 的输出必须可从以下位置读取:

  • Spark 作业 B
  • Spark 环境之外的独立 Scala 程序(不依赖 Spark)

我目前正在使用 Java 的 native 序列化和 Scala 案例类。

来自 A Spark 作业:

val model = ALSFactorizerModel(...)

context.writeSerializable(resultOutputPath, model)

使用序列化方法:

def writeSerializable[T <: Serializable](path: String, obj: T): Unit = {
val writer: OutputStream = ... // Google Cloud Storage dependant
val oos: ObjectOutputStream = new ObjectOutputStream(writer)
oos.writeObject(obj)
oos.close()
writer.close()
}

来自 B Spark 作业或任何独立的非 Spark Scala 代码:

val lastFactorizerModel: ALSFactorizerModel = context
.readSerializable[ALSFactorizerModel](ALSFactorizer.resultOutputPath)

使用反序列化方法:

def readSerializable[T <: Serializable](path: String): T = {
val is : InputStream = ... // Google Cloud Storage dependant
val ois = new ObjectInputStream(is)
val model: T = ois
.readObject()
.asInstanceOf[T]
ois.close()
is.close()

model
}

(嵌套)案例类:

ALSFactorizer模型:

package mycompany.algo.als.common.io.model.factorizer

import mycompany.data.item.ItemStore

@SerialVersionUID(1L)
final case class ALSFactorizerModel(
knownItems: Array[ALSFeaturedKnownItem],
unknownItems: Array[ALSFeaturedUnknownItem],
rank: Int,
modelTS: Long,
itemRepositoryTS: Long,
stores: Seq[ItemStore]
) {
}

元素商店:

package mycompany.data.item

@SerialVersionUID(1L)
final case class ItemStore(
id: String,
tenant: String,
name: String,
index: Int
) {
}

输出:

  • 来自独立的非 Spark Scala 程序 => 确定
  • 来自在我的开发计算机上本地运行的 B Spark 作业(Spark 独立本地节点)=> 确定
  • 来自 (Dataproc) Spark 集群上运行的 B Spark 作业 => 失败并出现以下异常:

异常(exception):

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field mycompany.algo.als.common.io.model.factorizer.ALSFactorizerModel.stores of type scala.collection.Seq in instance of mycompany.algo.als.common.io.model.factorizer.ALSFactorizerModel
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at mycompany.fs.gcs.SimpleGCSFileSystem.readSerializable(SimpleGCSFileSystem.scala:71)
at mycompany.algo.als.batch.strategy.ALSClusterer$.run(ALSClusterer.scala:38)
at mycompany.batch.SinglePredictorEbapBatch$$anonfun$3.apply(SinglePredictorEbapBatch.scala:55)
at mycompany.batch.SinglePredictorEbapBatch$$anonfun$3.apply(SinglePredictorEbapBatch.scala:55)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

我错过了什么吗?我是否应该配置 Dataproc/Spark 以支持对此代码使用 Java 序列化?

我使用 --jars <path to my fatjar> 提交作业并且之前从未遇到过其他问题。 Spark依赖不包含在这个Jar中,范围是Provided .

Scala 版本:2.11.8Spark版本:2.0.2SBT版本:0.13.13

感谢您的帮助

最佳答案

stores: Array[ItemStore] 替换 stores: Seq[ItemStore] 已经为我们解决了这个问题。

或者,我们可以使用另一个类加载器来进行序列化/反序列化操作。

希望这会有所帮助。

关于java - 使用 Spark 驱动程序中的 Java native readObject 进行反序列化时出现 ClassCastException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48745465/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com