gpt4 book ai didi

apache-spark - 如何在awaitTermination后获取流查询的进度?

转载 作者:行者123 更新时间:2023-12-03 23:13:51 30 4
gpt4 key购买 nike

我是 spark 新手,正在阅读有关监控 spark 应用程序的一些内容。基本上,我想知道在给定的触发时间和查询进度中,spark 应用程序处理了多少条记录。我知道“lastProgress”给出了所有这些指标,但是当我将 awaitTermination 与“lastProgress”一起使用时,它总是返回 null。

 val q4s = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
.writeStream
.outputMode("append")
.option("checkpointLocation", checkpoint_loc)
.trigger(Trigger.ProcessingTime("10 seconds"))
.format("console")
.start()

println("Query Id: "+ q4s.id.toString())
println("QUERY PROGRESS.........")
println(q4s.lastProgress);
q4s.awaitTermination();

输出:
Query Id: efd6bc15-f10c-4938-a1aa-c81fdb2b33e3
QUERY PROGRESS.........
null

如何在使用 awaitTermination 时获取查询进度,或者如何在不使用 awaitTermination 的情况下保持查询持续运行?

提前致谢。

最佳答案

您必须使用对流式查询的引用来启动一个单独的线程以进行监控(比如 q4s )并定期拉取进度。

启动查询的线程(Spark Structured Streaming 应用程序的主线程)通常是 awaitTermination所以它启动的流查询的守护进程可以继续运行。

关于apache-spark - 如何在awaitTermination后获取流查询的进度?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54436822/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com