- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个类似 this 的 Spark 流/DStream 应用程序:
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
我的上下文使用配置文件,我可以在其中使用 appConf.getString
等方法提取项目。所以我实际上使用:
val context = StreamingContext.getOrCreate(
appConf.getString("spark.checkpointDirectory"),
() => createStreamContext(sparkConf, appConf))
其中 val sparkConf = new SparkConf()...
。
如果我停止我的应用程序并更改应用程序文件中的配置,除非我删除检查点目录内容,否则这些更改不会生效。例如,我想动态更改 spark.streaming.kafka.maxRatePerPartition
或 spark.windowDurationSecs
。 (编辑:我终止应用程序,更改配置文件,然后重新启动应用程序。)我怎样才能动态更改这些设置或强制执行(已编辑的字词)配置更改而不破坏我的检查点目录(哪个即将包括状态信息的检查点)?
最佳答案
How can I do dynamically change these settings or enforce a configuration change without trashing my checkpoint directory?
如果深入了解 StreamingContext.getOrCreate
的代码:
def getOrCreate(
checkpointPath: String,
creatingFunc: () => StreamingContext,
hadoopConf: Configuration = SparkHadoopUtil.get.conf,
createOnError: Boolean = false
): StreamingContext = {
val checkpointOption = CheckpointReader.read(
checkpointPath, new SparkConf(), hadoopConf, createOnError)
checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}
你可以看到,如果 CheckpointReader
在类路径中有检查点数据,它使用 new SparkConf()
作为参数,因为重载不允许传递自定义创建的 SparkConf
。默认情况下,SparkConf
将加载声明为环境变量或传递给类路径的任何设置:
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
import SparkConf._
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
因此,实现您想要的目标的一种方法是,您可以通过 spark.driver.extraClassPath
和 传递参数,而不是在代码中创建
到 SparkConf
对象>spark.executor.extraClassPathspark-submit
。
关于apache-spark - 使用检查点 Spark Stream 的中流更改配置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36832761/
我正在尝试将资源/流设置为 Android 中的墙纸。我使用 WallpaperManager 类及其方法 setResource/setStream 来执行此操作。我通常在将图像设置为墙纸之前使用
我正在编写脚本来通过反复取消对象直到 EOF 来处理(非常大的)文件。我想对文件进行分区并让单独的进程(在云中)解开并处理单独的部分。 但是我的分区器并不智能,它不知道文件中 pickle 对象之间的
我正在实现图形表示。 Map>> g = new HashMap<>(); Graph 类中的一个方法是 List> getAllEdges() { List> allEdges = new
我正在通过 MediaCodec 处理实时流,并且有一个场景,其中 MediaFormat 在流中更改(即:正在解码的视频的分辨率发生更改)。鉴于我将解码器附加到 Surface 以在我检测到传入流的
嗨 iCoders 目前我正在开发一个使用 OpenTok/TokBox iOS SDK 进行直播的应用程序。我怀疑有多少订阅者可以订阅发布者发布的流。我在 openTok 论坛中搜索过这个但是没有找
我是一名优秀的程序员,十分优秀!