- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
想知道为什么 StatefulNetworkWordCount.scala 示例调用臭名昭著的 updateStateByKey() 函数,该函数应该只将函数作为参数,而不是:
val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,
new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)
最佳答案
这是因为:
updateStateByKey
接收单个函数作为参数,而在 1.3 中他们对其进行了优化 updateStateByKey
的不同版本存在于 1.2 和 1.3 中。但是1.2没有4个参数的版本,只有1.3才引入:https://github.com/apache/spark/blob/branch-1.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala /**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* org.apache.spark.Partitioner is used to control the partitioning of each RDD.
* @param updateFunc State update function. Note, that this function may generate a different
* tuple with a different key than the input key. Therefore keys may be removed
* or added in this way. It is up to the developer to decide whether to
* remember the partitioner despite the key being changed.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new
* DStream
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
* @param initialRDD initial state value of each key.
* @tparam S State type
*/
def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean,
initialRDD: RDD[(K, S)]
): DStream[(K, S)] = {
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,
rememberPartitioner, Some(initialRDD))
}
关于scala - Spark 流示例使用附加参数调用 updateStateByKey,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28998408/
想知道为什么 StatefulNetworkWordCount.scala 示例调用臭名昭著的 updateStateByKey() 函数,该函数应该只将函数作为参数,而不是: val stateDs
我编写了一个与 updateStateByKey 一起使用的简单函数,以查看问题是否是因为我的 updateFunc。我认为这一定是由于其他原因。我在 --master local[4] 上运行它。
我正在运行一个 24X7 的 Spark 流并使用 updateStateByKey 函数来保存计算的历史数据,就像 NetworkWordCount Example 的情况一样.. 我试图流式传输一
如何通过 INPUT PostgreSQL 表的更改触发的 Spark 结构化流计算来更新 OUTPUT TABLE 的状态? 作为现实生活中的场景,USERS 表已被user_id = 0002 更
我正在使用 updateStateByKey()在我的 Spark Streaming 应用程序中维护状态的操作。输入数据来自 Kafka 主题。 我想了解 DStreams 是如何分区的? 分区如何
我正在尝试使用 Spark Streaming 编写一个简单的应用程序,以从 Kafka 读取数据,并持续计算从主题读取单词的次数。我在调用非常重要的 updateStateByKey 方法时遇到问题
我正在尝试合并两个流,其中一个应该是有状态的(比如不经常更新的静态数据): SparkConf conf = new SparkConf().setAppName("Test Application"
我正在尝试通过从 Kafka 读取的(假)apache Web 服务器日志运行有状态 Spark Streaming 计算。目标是“ session 化”类似于 this blog post 的网络流
我正在 24/7 全天候运行 Spark 流并使用 updateStateByKey是否可以 24/7 全天候运行 Spark Streaming?如果是,updateStateByKey 不会变大,
我在 Scala 中有这个通用方法 def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]]
当我遇到 updateStateByKey() 函数时,我刚刚开始寻找使用 Spark Streaming 进行有状态计算的解决方案。 我试图解决的问题: 10,000 个传感器每分钟产生一个二进制值
我在 Spark Streaming 应用程序中使用 updateStateByKey 函数来持久化和更新每个键的状态。问题是我想知道 “ key ”在更新函数里面。 input.updateStat
我是一名优秀的程序员,十分优秀!