gpt4 book ai didi

java - Spark streaming mapWithState 超时延迟?

转载 作者:搜寻专家 更新时间:2023-11-01 02:39:43 26 4
gpt4 key购买 nike

我希望 Spark 1.6+ 的新 mapWithState API 能够几乎立即删除超时的对象,但存在延迟。

我正在使用 JavaStatefulNetworkWordCount 的改编版本测试 API下面:

SparkConf sparkConf = new SparkConf()
.setAppName("JavaStatefulNetworkWordCount")
.setMaster("local[*]");

JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
ssc.checkpoint("./tmp");

StateSpec<String, Integer, Integer, Tuple2<String, Integer>> mappingFunc =
StateSpec.function((word, one, state) -> {
if (state.isTimingOut())
{
System.out.println("Timing out the word: " + word);
return new Tuple2<String,Integer>(word, state.get());
}
else
{
int sum = one.or(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<String, Integer>(word, sum);
state.update(sum);
return output;
}
});

JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER_2)
.flatMap(x -> Arrays.asList(SPACE.split(x)))
.mapToPair(w -> new Tuple2<String, Integer>(w, 1))
.mapWithState(mappingFunc.timeout(Durations.seconds(5)));

stateDstream.stateSnapshots().print();

连同 nc ( nc -l -p <port> )

当我在 nc 窗口中键入一个词时,我看到控制台每秒打印一个元组。但根据超时设置,超时消息似乎并没有像预期的那样在 5 秒后打印出来。元组过期所需的时间似乎在 5 到 20 秒之间变化。

我是否遗漏了某些配置选项,或者超时可能仅与检查点同时执行?

最佳答案

一旦事件超时,它不会立即被删除,但只会通过将其保存到“deltaMap”来标记为删除:

override def remove(key: K): Unit = {
val stateInfo = deltaMap(key)
if (stateInfo != null) {
stateInfo.markDeleted()
} else {
val newInfo = new StateInfo[S](deleted = true)
deltaMap.update(key, newInfo)
}
}

然后,超时事件被收集并发送到输出流仅在检查点。也就是说:在批处理 t 超时的事件将仅在下一个检查点出现在输出流中 - 默认情况下,平均在 5 个批处理间隔之后,即批处理 t+5:

 override def checkpoint(): Unit = {
super.checkpoint()
doFullScan = true
}

...

removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled

...

// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
...

只有当元素足够多并且状态图被序列化时,元素才会被移除——目前也只发生在检查点:

  /** Whether the delta chain length is long enough that it should be compacted */
def shouldCompact: Boolean = {
deltaChainLength >= deltaChainThreshold
}
// Write the data in the parent state map while copying the data into a new parent map for
// compaction (if needed)
val doCompaction = shouldCompact
...

默认情况下,检查点每 10 次迭代发生一次,因此在上面的示例中每 10 秒一次;由于您的超时时间为 5 秒,因此事件预计会在 5-15 秒内发生。

编辑:根据@YuvalItzchakov 的评论更正和详尽的回答

关于java - Spark streaming mapWithState 超时延迟?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36552803/

26 4 0