gpt4 book ai didi

spark-structured-streaming - Spark 结构化流异常处理

转载 作者:行者123 更新时间:2023-12-02 17:02:14 25 4
gpt4 key购买 nike

我使用 Spark Structured Streaming API 从 MQTT 流源读取数据。

val lines:= spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic", "Employee")
.option("username", "username")
.option("password", "passwork")
.option("clientId", "employee11")
.load("tcp://localhost:8000").as[(String, Timestamp)]

我将流数据转换为案例类 Employee

case class Employee(Name: String, Department: String)    
val ds = lines.map {
row =>
implicit val format = DefaultFormats
parse(row._1).extract[Employee]
}
....some transformations
df.writeStream
.outputMode("append")
.format("es")
.option("es.resource", "spark/employee")
.option("es.nodes", "localhost")
.option("es.port", 9200)
.start()
.awaitTermination()

现在队列中有一些消息与 Employee 案例类具有不同的结构。假设缺少一些必需的列。我的流式处理作业因未找到字段异常而失败。

现在我想处理这样的异常,也想发送同样的警报通知。我尝试放置一个 try/catch block 。

case class ErrorMessage(row: String)        
catch {

case e: Exception =>
val ds = lines.map {
row =>
implicit val format = DefaultFormats
parse(row._1).extract[ErrorMessage]
}
val error = lines.foreach(row => {
sendErrorMail(row._1)
})
}
}

得到异常 线程“main”中的异常 org.apache.spark.sql.AnalysisException:必须使用 writeStream.start() 执行流源查询;;
MQTT
对此的任何帮助将不胜感激。

最佳答案

我认为您应该使用 start() 方法的返回对象,如 Spark streaming doc 中所述.像这样的东西:

val query = df.writeStream. ... .start()
try {
//If the query has terminated with an exception, then the exception will be thrown.
query.awaitTermination()
catch {
case ex: Exception => /*code to send mail*/
}

实现自己的 foreach sink 会导致频繁打开和关闭连接的开销。

关于spark-structured-streaming - Spark 结构化流异常处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53624398/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com