- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我希望 Spark 1.6+ 的新 mapWithState API 能够几乎立即删除超时的对象,但存在延迟。
我正在使用 JavaStatefulNetworkWordCount 的改编版本测试 API下面:
SparkConf sparkConf = new SparkConf()
.setAppName("JavaStatefulNetworkWordCount")
.setMaster("local[*]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint("./tmp");
StateSpec<String, Integer, Integer, Tuple2<String, Integer>> mappingFunc =
StateSpec.function((word, one, state) -> {
if (state.isTimingOut())
{
System.out.println("Timing out the word: " + word);
return new Tuple2<String,Integer>(word, state.get());
}
else
{
int sum = one.or(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<String, Integer>(word, sum);
state.update(sum);
return output;
}
});
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER_2)
.flatMap(x -> Arrays.asList(SPACE.split(x)))
.mapToPair(w -> new Tuple2<String, Integer>(w, 1))
.mapWithState(mappingFunc.timeout(Durations.seconds(5)));
stateDstream.stateSnapshots().print();
连同 nc ( nc -l -p <port>
)
当我在 nc 窗口中键入一个词时,我看到控制台每秒打印一个元组。但根据超时设置,超时消息似乎并没有像预期的那样在 5 秒后打印出来。元组过期所需的时间似乎在 5 到 20 秒之间变化。
我是否遗漏了某些配置选项,或者超时可能仅与检查点同时执行?
最佳答案
一旦事件超时,它不会立即被删除,但只会通过将其保存到“deltaMap”来标记为删除:
override def remove(key: K): Unit = {
val stateInfo = deltaMap(key)
if (stateInfo != null) {
stateInfo.markDeleted()
} else {
val newInfo = new StateInfo[S](deleted = true)
deltaMap.update(key, newInfo)
}
}
然后,超时事件被收集并发送到输出流仅在检查点。也就是说:在批处理 t 超时的事件将仅在下一个检查点出现在输出流中 - 默认情况下,平均在 5 个批处理间隔之后,即批处理 t+5:
override def checkpoint(): Unit = {
super.checkpoint()
doFullScan = true
}
...
removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
...
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
...
只有当元素足够多并且状态图被序列化时,元素才会被移除——目前也只发生在检查点:
/** Whether the delta chain length is long enough that it should be compacted */
def shouldCompact: Boolean = {
deltaChainLength >= deltaChainThreshold
}
// Write the data in the parent state map while copying the data into a new parent map for
// compaction (if needed)
val doCompaction = shouldCompact
...
默认情况下,检查点每 10 次迭代发生一次,因此在上面的示例中每 10 秒一次;由于您的超时时间为 5 秒,因此事件预计会在 5-15 秒内发生。
编辑:根据@YuvalItzchakov 的评论更正和详尽的回答
关于java - Spark streaming mapWithState 超时延迟?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36552803/
我正在处理 Scala (2.11)/Spark (1.6.1) 流项目并使用 mapWithState()跟踪从以前的批次中看到的数据。 状态分为 20 个分区,由 StateSpec.functi
升级到 Spark 1.6.1 后,我开始重构应用程序以将 updateStateByKey 替换为 mapWithState。 为了利用新 API 的性能优势,我不想调用加载所有状态的 stateS
我在 mapWithState 中保留了一对由字符串组成的键和一个包含数组的对象作为状态。如果出现包含相同键的新流,我将更新数组。如果 spark 应用程序在多个节点上运行,或者 spark 一次只允
我正在使用 spark mapwithstate,但存储空间仍在继续增长。 问题 1。 MapPartitionsRDD 内存大小 9GB x 20 你能减少这个大小吗? 问题 2。 And in I
我正在处理 Scala (2.11)/Spark (1.6.1) 流项目并使用 mapWithState()跟踪从以前的批次中看到的数据。 状态分布在多个节点上的 20 个分区中,创建于 StateS
我一直在 Spark Streaming 中使用 mapWithState API,但关于 StateSpec.function 有两件事不清楚: 假设我的功能是: def trackStateFor
如何访问由多个微批处理构建的所有键的状态。 val stateSpec = StateSpec.function(stateUpdate _) .numPartitions(numPartitio
我希望 Spark 1.6+ 的新 mapWithState API 能够几乎立即删除超时的对象,但存在延迟。 我正在使用 JavaStatefulNetworkWordCount 的改编版本测试 A
想象一个用例,其中每个用户都有事件流,但只对第一周的事件感兴趣。在该时间范围内,使用 mapWithState 进行有状态逻辑。在此期间之后,应忽略用户传入事件。 由于用户的状态需要内存,因此在用户一
我有 2 个 JavaPairDStream。它们具有相同的键(类型和值)和相同的值类型(不同的值)。我需要它们共享相同的状态以返回基于当前状态的结果,因此我使用相同的 mapWithState 函数
我正在使用 spark 从 Kafka Stream 接收数据,以接收有关发送定期健康更新的 IOT 设备的状态以及有关设备中存在的各种传感器的状态。我的 Spark 应用程序监听单个主题以使用 Sp
我正在使用函数 mapWithState() 在我的 spark 流应用程序中计算 UV。在 mapWithState 之后,我得到一个 dstream 和 foreachRDD。在函数foreach
我已成功集成代码以从事件中心拉取消息并通过 Spark/spark-streaming 处理它们。我现在开始在消息传递时管理状态。这是我正在使用的代码,其中大部分是 https://docs.clou
我有一个 Spark scala 流应用程序,它使用 mapWithState session 化来自 Kafka 的用户生成的事件。我想通过在维护的情况下启用暂停和恢复应用程序来完善设置。我已经将
我在尝试在 spark 中同时使用 broadcast、mapWithState 和 checkpointing 时遇到 spark streaming 作业的问题。 用法如下: 因为我必须将一些
我是一名优秀的程序员,十分优秀!