- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在 Spark Streaming 中,可以(如果您要使用有状态操作,则是强制性的)设置 StreamingContext
将检查点执行到(AND)的可靠数据存储(S3,HDFS,...)中:
DStream
血统yourSparkStreamingCtx.checkpoint(datastoreURL)
DataStream
设置沿袭检查点间隔。只需调用
checkpoint(timeInterval)
在他们。实际上,建议将 lineage checkpoint 间隔设置在
DataStream
的 5 到 10 倍之间。的滑动间隔:
dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.
ds.checkpoint(interval)
被称为 , 是否为所有数据流启用了沿袭检查点,默认值为
checkpointInterval
等于
batchInterval
?或者,相反,只有元数据检查点启用了什么?
最佳答案
检查 Spark 代码 (v1.5) 我发现 DStream
在两种情况下启用 s 的检查点:
通过明确调用他们的 checkpoint
方法(不是 StreamContext
的):
/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration): DStream[T] = {
if (isInitialized) {
throw new UnsupportedOperationException(
"Cannot change checkpoint interval of an DStream after streaming context has started")
}
persist()
checkpointDuration = interval
this
}
DStream
只要具体的“DStream”子类被覆盖,初始化 mustCheckpoint
属性(将其设置为 true
):
private[streaming] def initialize(time: Time) {
...
...
// Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
if (mustCheckpoint && checkpointDuration == null) {
checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
logInfo("Checkpoint interval automatically set to " + checkpointDuration)
}
...
grep "val mustCheckpoint = true" $(find -type f -name "*.scala")
> ./org/apache/spark/streaming/api/python/PythonDStream.scala: override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala: override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/StateDStream.scala: override val mustCheckpoint = true
PythonDStream
),
StreamingContext
检查点仅启用
StateDStream
的沿袭检查点和
ReducedWindowedDStream
实例。这些实例是转换的结果(分别为 AND):
关于apache-spark - DStreams 的 Spark 流检查点,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34550374/
我是Spark的新手。看起来超棒! 我有来自不同来源的每小时日志文件的内容,并且想用〜5分钟的滑动窗口从它们创建DStream,以探索相关性。 我只是想知道实现此目标的最佳方法是什么。我应该把它们切成
我想将 DStream 中的每个 RDD 加入一个非流式的、不变的引用文件。这是我的代码: val sparkConf = new SparkConf().setAppName("LogCounter
是否可以将流式传输 o.a.s.sql.Dataset 转换为 DStream?如果是,怎么办? 我知道如何将它转换为 RDD,但它是在流式上下文中。 最佳答案 这是不可能的。 Structured
我正在处理一个 java jar。累加器将流值相加。问题是,我想在每次递增时或在特定的周期间隔内在我的 UI 中显示该值。 但是,由于累加器的值只能从 Driver 程序中获取,因此在进程完成执行之前
Spark流textFileStream和fileStream可以监视目录并处理Dstream RDD中的新文件。 如何获取特定时间间隔内DStream RDD正在处理的文件名? 最佳答案 fileS
在 Spark Streaming 中,每一批数据总是生成一个且仅一个 RDD,为什么我们使用 foreachRDD() 来 foreach RDD? RDD只是一个,不需要foreach。在我的测试
无论数据量有多大,一个批处理的数据是否会在 DStream 中生成一个且仅一个 RDD? 最佳答案 是的,每个批处理间隔恰好有一个 RDD,在每个批处理间隔生成,与记录数量无关(包含在 RDD 中 -
我遇到了以下处理 Spark Streaming 中的消息的代码: val listRDD = ssc.socketTextStream(host, port) listRDD.foreachRDD(
我在 Spark Scala 中有一些 DStream,我想对它进行排序然后取前 N 个。问题是,每当我尝试运行它时,我都会得到 NotSerializableException 并且异常消息显示:
我正在尝试将部分函数传递给通过滑动窗口在 DStream 批处理中捕获的所有 RDD 的并集。假设我在离散为 1 秒批处理的流上构造了一个超过 10 秒的窗口操作: val ssc = new Str
我是 Spark 编程的新手。我有一个 Spark 流程序,它需要将接收到的 DStream 存储到数据库中。我想迭代我的 Dstream 并将每条记录存储到数据库中。 像这样。 JavaStream
我知道我们有一个 RDD 选项: JavaRDD javaRDD = coreRdd.toJavaRDD();` 是否可以将 Dstream 转换为 JavaDStream? 最佳答案 是的,您可以使
我一直面临关于将输出 Dstream 插入永久 SQL 表的“Spark Streaming”问题。我想将每个输出 DStream(来自激发进程的单个批次)插入到一个唯一的表中。我一直在使用 Pyth
我正在尝试使用 apache spark 流。我有一个数据源,来自 HDFS 的 csv 文件。 我打算用 Spark Stream 做以下事情: 使用 textFileStream 定期(5 分钟)
在 Spark Streaming 中,可以(如果您要使用有状态操作,则是强制性的)设置 StreamingContext将检查点执行到(AND)的可靠数据存储(S3,HDFS,...)中: 元数据
我正在使用 updateStateByKey()在我的 Spark Streaming 应用程序中维护状态的操作。输入数据来自 Kafka 主题。 我想了解 DStreams 是如何分区的? 分区如何
我已经经历了this stackoverflow 问题,根据答案,它创建了一个 DStream,批处理间隔只有一个 RDD。 例如: 我的批处理间隔是 1 分钟,Spark Streaming 作业正
如何从 dstream 窗口返回单个 rdd?: my_dstream_window : somedstream.window(3mins,1min) 假设上面的my_dstream_window包含
在函数中,有没有办法在使用 filter 后返回两个 DStream?例如,当我过滤一个DStream时,过滤后的将存储在一个DStream中,未过滤的将存储在另一个DStream中。 最佳答案 如果
使用 pyspark 从 kinesis 消费数据后,我有一个包含如下条目的 dstream: ('filename_1', [{'name': 'test'}, {'name': 'more'},
我是一名优秀的程序员,十分优秀!