gpt4 book ai didi

java - Spark udf - 带有 json 的 classcastException

转载 作者:行者123 更新时间:2023-11-30 05:39:38 26 4
gpt4 key购买 nike

我正在运行java Spark代码,读取一些json数据并通过UDF将其中一个字段转换为大写该代码在本地模式下运行时工作正常,但在集群中(在 kubernetes 下)运行时,我得到如下 ClassCastException:

UDF1 uppercase = new UdfUppercase() ; 
session.udf().register("uppercasefunction",uppercase , DataTypes.StringType) ;

StructField[] structFields = new StructField[]{
new StructField("intColumn", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("stringColumn", DataTypes.StringType, true, Metadata.empty())
};
StructType structType = new StructType(structFields);

List<String> jsonData = ImmutableList.of(
"{\"intColumn\":1,\"stringColumn\":\"Miami\"}");

Dataset<String> anotherPeopleDataset = session.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = session.read().schema(structType).json(anotherPeopleDataset);
anotherPeople.show(false);

Dataset<Row> dfupercase = anotherPeople.select(callUDF("uppercasefunction", col("stringColumn")));
dfupercase.show(false);
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

任何帮助将不胜感激

最佳答案

问题现在已经解决了,它与 spring boot 的一些 jar 冲突有关classcastException 具有误导性,因为我们的印象是我们的数据帧没有得到很好的序列化(本地和集群之间的差异),而实际上它与代码 istelf 无关,但 jar 冲突

关于java - Spark udf - 带有 json 的 classcastException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55907052/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com