gpt4 book ai didi

scala - 流式源的查询必须使用 writeStream.start() 执行;

转载 作者:行者123 更新时间:2023-12-01 16:13:58 24 4
gpt4 key购买 nike

我正在尝试在 Spark 中读取来自 kafka(版本 10)的消息并尝试打印它。

     import spark.implicits._

val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.config("spark.master", "local")
.getOrCreate()

val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicA")
.load()

ds1.collect.foreach(println)
ds1.writeStream
.format("console")
.start()

ds1.printSchema()

在线程“main”中出现错误异常

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

最佳答案

您正在对查询计划进行分支:从您尝试执行的同一个 ds1 开始:

  • ds1.collect.foreach(...)
  • ds1.writeStream.format(...){...}

但是您只在第二个分支上调用 .start() ,而使另一个分支悬空而没有终止,这反过来会引发您返回的异常。

解决方案是启动两个分支并等待终止。

val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicA")
.load()
val query1 = ds1.collect.foreach(println)
.writeStream
.format("console")
.start()
val query2 = ds1.writeStream
.format("console")
.start()

ds1.printSchema()
query1.awaitTermination()
query2.awaitTermination()

关于scala - 流式源的查询必须使用 writeStream.start() 执行;,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40609771/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com