gpt4 book ai didi

scala - 如何同时使用 SparkSession 和 StreamingContext?

转载 作者:行者123 更新时间:2023-12-01 01:46:35 27 4
gpt4 key购买 nike

我正在尝试从本地计算机 (OSX) 上的文件夹中流式传输 CSV 文件。我将 SparkSession 和 StreamingContext 一起使用,如下所示:

val sc: SparkContext = createSparkContext(sparkContextName)
val sparkSess = SparkSession.builder().config(sc.getConf).getOrCreate()
val ssc = new StreamingContext(sparkSess.sparkContext, Seconds(time))

val csvSchema = new StructType().add("field_name",StringType)
val inputDF = sparkSess.readStream.format("org.apache.spark.csv").schema(csvSchema).csv("file:///Users/userName/Documents/Notes/MoreNotes/tmpFolder/")

如果我运行 ssc.start()在此之后,我收到此错误:
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

相反,如果我尝试启动 SparkSession像这样:
inputDF.writeStream.format("console").start()

我得到:
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

显然我不明白如何 SparkSessionStreamingContext应该一起工作。如果我摆脱 SparkSession , StreamingContext只有 textFileStream我需要在其上施加 CSV 模式。希望得到有关如何使其工作的任何澄清。

最佳答案

您不能同时进行 spark session 和 spark 上下文。随着 Spark 2.0.0 的发布,开发人员可以使用一个新的抽象——Spark Session——它可以像以前可用的 Spark Context 一样被实例化和调用。

您仍然可以从 spark session 构建器访问 spark 上下文:

 val sparkSess = SparkSession.builder().appName("My App").getOrCreate()
val sc = sparkSess.sparkContext
val ssc = new StreamingContext(sc, Seconds(time))

导致您的工作失败的另一件事是您正在执行转换并且没有调用任何操作。最后应该调用一些 Action ,例如 inputDF.show()

关于scala - 如何同时使用 SparkSession 和 StreamingContext?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49307317/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com