gpt4 book ai didi

apache-spark - DStreams 的 Spark 流检查点

转载 作者:行者123 更新时间:2023-12-03 23:48:56 24 4
gpt4 key购买 nike

在 Spark Streaming 中,可以(如果您要使用有状态操作,则是强制性的)设置 StreamingContext将检查点执行到(AND)的可靠数据存储(S3,HDFS,...)中:

  • 元数据
  • DStream血统

  • 如所述 here , 要设置输出数据存储您需要调用 yourSparkStreamingCtx.checkpoint(datastoreURL)
    另一方面,可以为每个 DataStream 设置沿袭检查点间隔。只需调用 checkpoint(timeInterval)在他们。实际上,建议将 lineage checkpoint 间隔设置在 DataStream 的 5 到 10 倍之间。的滑动间隔:

    dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.



    我的问题是:

    当流上下文已设置为执行检查点和 ds.checkpoint(interval)被称为 , 是否为所有数据流启用了沿袭检查点,默认值为 checkpointInterval等于 batchInterval ?或者,相反,只有元数据检查点启用了什么?

    最佳答案

    检查 Spark 代码 (v1.5) 我发现 DStream在两种情况下启用 s 的检查点:

    通过明确调用他们的 checkpoint方法(不是 StreamContext 的):

    /**
    * Enable periodic checkpointing of RDDs of this DStream
    * @param interval Time interval after which generated RDD will be checkpointed
    */
    def checkpoint(interval: Duration): DStream[T] = {
    if (isInitialized) {
    throw new UnsupportedOperationException(
    "Cannot change checkpoint interval of an DStream after streaming context has started")
    }
    persist()
    checkpointDuration = interval
    this
    }

    DStream只要具体的“DStream”子类被覆盖,初始化 mustCheckpoint属性(将其设置为 true ):
     private[streaming] def initialize(time: Time) {
    ...
    ...
    // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
    if (mustCheckpoint && checkpointDuration == null) {
    checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
    logInfo("Checkpoint interval automatically set to " + checkpointDuration)
    }
    ...

    第一种情况是显而易见的。对 Spark Streaming 代码进行简单的分析:
    grep "val mustCheckpoint = true" $(find -type f -name "*.scala")

    > ./org/apache/spark/streaming/api/python/PythonDStream.scala: override val mustCheckpoint = true
    >./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala: override val mustCheckpoint = true
    >./org/apache/spark/streaming/dstream/StateDStream.scala: override val mustCheckpoint = true

    我可以发现,总的来说(忽略 PythonDStream ), StreamingContext检查点仅启用 StateDStream 的沿袭检查点和 ReducedWindowedDStream实例。这些实例是转换的结果(分别为 AND):
  • updateStateByKey :也就是说,流通过多​​个窗口提供状态。
  • reduceByKeyAndWindow
  • 关于apache-spark - DStreams 的 Spark 流检查点,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34550374/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com