gpt4 book ai didi

apache-spark - spark writeStream 到 kafka - awaitTermination() 与 awaitAnyTermination() 之间的区别

转载 作者:行者123 更新时间:2023-12-05 05:02:33 27 4
gpt4 key购买 nike

根据官方文档,我使用下面的代码段写入 kafka 主题,但它没有写入 kafka。

finalStream = final \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers",bootstrap_servers) \
.option("topic",topic_name) \
.option("checkpointLocation", check_point_location) \
.start()

finalStream.awaitTermination()

但是通过使用 awaitAnyTermination() 而不是 awaitTermination(),写入 kafka 是可行的。

spark.streams.awaitAnyTermination()

请指出这背后的原因。

最佳答案

"Difference between awaitTermination() vs awaitAnyTermination()"

引用源代码中的注释

awaitTermination : "等待此查询的终止,通过 query.stop() 或异常终止。如果查询因异常终止,则异常将被抛出。否则,它返回查询是否是否在 timeoutMs 毫秒内终止。如果查询已终止,则对该方法的所有后续调用将立即返回 true(如果查询由 stop() 终止),或立即抛出异常(如果查询已异常终止)。”

awaitAnyTermination : "等到相关 SQLContext 上的任何查询终止自上下文创建以来,或自 resetTerminated() 被调用。如果任何查询因异常而终止,则异常将是抛出。如果查询已终止,则对 awaitAnyTermination() 的后续调用将立即返回(如果查询被 query.stop() 终止),或立即抛出异常(如果查询因异常而终止)。使用 resetTerminated()清除过去的终止并等待新的终止。在调用 resetTermination() 后终止多个查询的情况下,如果任何查询以异常终止,则 awaitAnyTermination() 将抛出任何异常。为了正确记录多个异常查询,用户需要在其中任何一个异常终止后停止所有查询,然后检查每个查询的 query.exception()。”

关于apache-spark - spark writeStream 到 kafka - awaitTermination() 与 awaitAnyTermination() 之间的区别,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62155006/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com