gpt4 book ai didi

java - 从磁盘读取 Spark 流错误 - java.io.NotSerializedException : org. apache.spark.streaming.api.java.JavaStreamingContext

转载 作者:行者123 更新时间:2023-11-30 07:40:31 25 4
gpt4 key购买 nike

 Caused by: java.io.NotSerializableException: org.apache.spark.streaming.api.java.JavaStreamingContext
Serialization stack:
- object not serializable (class: org.apache.spark.streaming.api.java.JavaStreamingContext, value: org.apache.spark.streaming.api.java.JavaStreamingContext@694c3f17)
- field (class: com.emc.network.sparkanalysis.logic.ProcessVideoStreamData$2, name: val$jssc, type: class org.apache.spark.streaming.api.java.JavaStreamingContext)
- object (class com.emc.network.sparkanalysis.logic.ProcessVideoStreamData$2, com.emc.network.sparkanalysis.logic.ProcessVideoStreamData$2@45abf747)
- field (class: com.emc.network.sparkanalysis.logic.ProcessVideoStreamData$2$1, name: this$1, type: class com.emc.network.sparkanalysis.logic.ProcessVideoStreamData$2)
- object (class com.emc.network.sparkanalysis.logic.ProcessVideoStreamData$2$1, com.emc.network.sparkanalysis.logic.ProcessVideoStreamData$2$1@5b02d2b0)
- field (class: org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1, name: f$3, type: interface org.apache.spark.api.java.function.FlatMapFunction)
- object (class org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

我在这一行遇到错误:

JavaRDD<String> words = javarddPerIp.flatMap(new FlatMapFunction<String, String>()
<小时/>
JavaPairDStream<String, String> ones = stream
.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
String[] lineArr = SPACE.split(s);
return new Tuple2<String, String>(lineArr[0], s);
}
});

JavaPairDStream<String, Iterable<String>> ipmapping = ones.groupByKey();

ipmapping
.foreachRDD(new Function2<JavaPairRDD<String, Iterable<String>>, Time, Void>() {
JavaRDD<String> finalLogs = jssc.sparkContext().emptyRDD();

@Override
public Void call(JavaPairRDD<String, Iterable<String>> v1,Time v2) throws Exception {

JavaRDD<Iterable<String>> stringValues = v1.values();
List<Iterable<String>> stringList = stringValues.collect();

for (Iterable<String> it : stringList) {
List<String> rddlist = new ArrayList<String>();
Iterator<String> values = it.iterator();
while (values.hasNext()) {
rddlist.add(values.next());
}

JavaRDD<String> javarddPerIp = jssc.sparkContext().parallelize(rddlist);

final Long numberOfrows;
numberOfrows = javarddPerIp.count();

System.out.println("javarddPerIp.count()"+ javarddPerIp.count());

JavaRDD<String> words = javarddPerIp
.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
String[] splitstring = SPACE
.split(s);
Double emcbitrate = Double
.parseDouble(splitstring[20])
/ Double.parseDouble(splitstring[30]);
StringBuffer sf = new StringBuffer();
sf.append(emcbitrate.toString())
.append(SPACE)
.append(splitstring[44])
.append(SPACE)
.append(splitstring[51]);
return Arrays.asList(s + SPACE
+ emcbitrate.toString());
}
});

最佳答案

最后,下面的帖子帮助我解决了这个问题:删除内部函数并创建新类并提供实现:

org.apache.spark.SparkException: Task not serializable

关于java - 从磁盘读取 Spark 流错误 - java.io.NotSerializedException : org. apache.spark.streaming.api.java.JavaStreamingContext,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34760888/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com