gpt4 book ai didi

java - Spark Streaming检查点异常

转载 作者:行者123 更新时间:2023-12-01 11:15:17 24 4
gpt4 key购买 nike

我的 Spark Streaming 管道与 kafka 集成,我还配置了检查点,为了测试弹性,我手动终止了作业,然后重新启动它,然后我遇到了异常

Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.streaming.dstream.ShuffledDStream@1d304ac has not been initialized
at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:267)

我使用的代码

JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {


public JavaStreamingContext create() {
final SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
sparkConf.setMaster("local[2]");

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); // new context

jssc.checkpoint("D:\\Checkpoint");
return jssc;
}
};

// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext jssc = JavaStreamingContext.getOrCreate("D:\\Checkpoint", contextFactory);
int numThreads = Integer.parseInt(1+"");

请指出我做错了什么

最佳答案

private static JavaStreamingContext createContext(){
final SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
sparkConf.setMaster("local[2]");

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); // new context

jssc.checkpoint("D:\\Checkpoint");
return jssc;
}

Function0<JavaStreamingContext> createContextFunc = new Function0<JavaStreamingContext>() {
@Override
public JavaStreamingContext call() {
return createContext();
}
};

JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate("D:\\Checkpoint", createContextFunc);

使用Function0,这对我有用。

关于java - Spark Streaming检查点异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31905831/

24 4 0