gpt4 book ai didi

apache-spark - 重启 Spark Streaming 应用程序的最佳方法是什么?

转载 作者:行者123 更新时间:2023-12-02 01:17:55 25 4
gpt4 key购买 nike

我基本上想在我的驱动程序中编写一个事件回调,它将在该事件到达时重新启动 spark 流应用程序。 我的驱动程序通过从文件中读取配置来设置流和执行逻辑。每当文件更改(添加新配置)时,驱动程序必须按顺序执行以下步骤,

  1. 重启,
  2. 读取配置文件(作为主要方法的一部分)和
  3. 设置流

实现此目标的最佳方法是什么?

最佳答案

在某些情况下,您可能希望动态地重新加载流上下文(例如重新加载流操作)。在这种情况下,您可以(Scala 示例):

val sparkContext = new SparkContext()

val stopEvent = false
var streamingContext = Option.empty[StreamingContext]
val shouldReload = false

val processThread = new Thread {
override def run(): Unit = {
while (!stopEvent){
if (streamingContext.isEmpty) {

// new context
streamingContext = Option(new StreamingContext(sparkContext, Seconds(1)))

// create DStreams
val lines = streamingContext.socketTextStream(...)

// your transformations and actions
// and decision to reload streaming context
// ...

streamingContext.get.start()
} else {
if (shouldReload) {
streamingContext.get.stop(stopSparkContext = false, stopGracefully = true)
streamingContext.get.awaitTermination()
streamingContext = Option.empty[StreamingContext]
} else {
Thread.sleep(1000)
}
}

}
streamingContext.get.stop(stopSparkContext =true, stopGracefully = true)
streamingContext.get.awaitTermination()
}
}

// and start it in separate thread
processThread.start()
processThread.join()

或者在 python 中:

spark_context = SparkContext()

stop_event = Event()
spark_streaming_context = None
should_reload = False

def process(self):
while not stop_event.is_set():
if spark_streaming_context is None:

# new context
spark_streaming_context = StreamingContext(spark_context, 0.5)

# create DStreams
lines = spark_streaming_context.socketTextStream(...)

# your transformations and actions
# and decision to reload streaming context
# ...

self.spark_streaming_context.start()
else:
# TODO move to config
if should_reload:
spark_streaming_context.stop(stopSparkContext=False, stopGraceFully=True)
spark_streaming_context.awaitTermination()
spark_streaming_context = None
else:
time.sleep(1)
else:
self.spark_streaming_context.stop(stopGraceFully=True)
self.spark_streaming_context.awaitTermination()


# and start it in separate thread
process_thread = threading.Thread(target=process)
process_thread.start()
process_thread.join()

如果你想防止代码崩溃并从最后一个地方重新启动streaming context使用checkpointing机制。它允许您在失败后恢复作业状态。

关于apache-spark - 重启 Spark Streaming 应用程序的最佳方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41721240/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com