gpt4 book ai didi

scala - Spark 流示例使用附加参数调用 updateStateByKey

转载 作者:行者123 更新时间:2023-12-04 20:01:38 25 4
gpt4 key购买 nike

想知道为什么 StatefulNetworkWordCount.scala 示例调用臭名昭著的 updateStateByKey() 函数,该函数应该只将函数作为参数,而不是:

val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)

为什么需要(以及如何处理它 - 这不在 updateStateByKey() 的签名中?)传递分区器、 bool 值和 RDD ?

谢谢,
马特

最佳答案

这是因为:

  • 您会看到不同的 Spark 发布分支:https://github.com/apache/spark/blob/branch-1.3/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala .在 Spark 1.2 中,这个代码只有 updateStateByKey接收单个函数作为参数,而在 1.3 中他们对其进行了优化
  • updateStateByKey的不同版本存在于 1.2 和 1.3 中。但是1.2没有4个参数的版本,只有1.3才引入:https://github.com/apache/spark/blob/branch-1.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

  • 这是代码:
    /**
    * Return a new "state" DStream where the state for each key is updated by applying
    * the given function on the previous state of the key and the new values of each key.
    * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    * @param updateFunc State update function. Note, that this function may generate a different
    * tuple with a different key than the input key. Therefore keys may be removed
    * or added in this way. It is up to the developer to decide whether to
    * remember the partitioner despite the key being changed.
    * @param partitioner Partitioner for controlling the partitioning of each RDD in the new
    * DStream
    * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
    * @param initialRDD initial state value of each key.
    * @tparam S State type
    */
    def updateStateByKey[S: ClassTag](
    updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
    partitioner: Partitioner,
    rememberPartitioner: Boolean,
    initialRDD: RDD[(K, S)]
    ): DStream[(K, S)] = {
    new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,
    rememberPartitioner, Some(initialRDD))
    }

    关于scala - Spark 流示例使用附加参数调用 updateStateByKey,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28998408/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com