gpt4 book ai didi

apache-spark - 如何使用外部触发器停止结构化流式查询?

转载 作者:行者123 更新时间:2023-12-03 21:17:05 25 4
gpt4 key购买 nike

我正在使用 Spark 结构化流媒体,我想检查是否有 stop文件存在以退出我的程序。

我可以做这样的事情:

def main(args: Array[String]) = {
val query = SparkSession...load.writeStream.foreachBatch{
if (stop file exist) exit(0)
// do some processing here
}.start()
// add Execute Listener here to listen query
query.awaitTermination()
}

但是,这只能在有新行附加到此查询表时触发。如果没有新行, stop文件不会有任何影响。

实现这个触发器有什么更好的主意吗?

以上是问题,感谢下面接受的答案,这是我最终运行良好的代码。
object QueryManager {
def queryTerminator(query: StreamingQuery): Runnable = new Runnable {
override def run() = {if(stop condition) query.stop()}
}
def listenTermination(query: StreamingQuery) = {
Executors.newSingleThreadScheduledExecutor.scheduleWithFixedDelay(
queryTerminator(query), initialDelay=1, delay=1, SECONDS
)
}
}
// and in main method
def main(args: Array[String]) = {
val query = SparkSession...load.writeStream.foreachBatch{
// do some processing here
}.start()
// add Execute Listener here to listen query
QueryManager.listenTermination(query)

query.awaitTermination()


// I am not familar with scala,
// but it seems would not exit if we do not add this
System.exit(0)
}

如果有任何错误,请告诉我。

最佳答案

Any better idea to implement this trigger?



流查询是结构化流应用程序的单独守护线程。它会一直运行,直到使用 StreamingQuery.stop 停止为止。 .

至少有两种方法可以访问正在运行的流查询:
  • DataStreamWriter.start()
  • StreamingQueryManager

  • 这个想法是在你的结构化流应用程序中有一个“控制线程”,它会监听停止请求(带有一个或多个流查询的 ID)并简单地执行 stop关于正在运行的流查询。

    将 Spark Structured Streaming 应用程序视为具有多个线程的单 JVM 应用程序。您可以再有一个来控制线程。这就是基本的想法。

    关于apache-spark - 如何使用外部触发器停止结构化流式查询?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59340568/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com