gpt4 book ai didi

apache-spark - Spark Streaming - window() 是否缓存 DStreams?

转载 作者:行者123 更新时间:2023-12-05 04:13:30 26 4
gpt4 key购买 nike

谁能解释一下 Spark Streaming 是如何执行 window() 操作的?从 Spark 1.6.1 文档来看,窗口批处理似乎自动缓存在内存中,但查看 Web UI 似乎已经在先前批处理中执行的操作再次执行。为了您的方便,我在下面附上了我正在运行的应用程序的屏幕截图:

Spark DAG

通过查看 Web UI,似乎缓存了 flatMapValues() RDD(绿点 - 这是我在 DStream 上调用 window() 之前执行的最后一个操作),但与此同时,它也似乎再次执行之前批处理中导致 flatMapValues() 的所有转换。如果是这种情况,window() 操作可能会导致巨大的性能损失,特别是如果窗口持续时间为 1 或 2 小时(正如我对我的应用程序所期望的那样)。您认为当时检查 DStream 会有帮助吗?考虑到预期的幻灯片窗口大约为 5 分钟。

希望有人能澄清这一点。

编辑

我添加了一个代码片段。 Stream1 和 Stream2 是从 HDFS 读取的数据源

JavaPairDStream<String, String> stream1 = cdr_orig.mapToPair(parserFunc)
.flatMapValues(new Function<String, Iterable<String>>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(s.split(","));
}
}).window(Durations.seconds(WINDOW_DURATION), Durations.seconds(SLIDE_DURATION));


JavaPairDStream<String, String> join = stream2.join(stream1);

这两个流由另一个系统周期性地产生。这些流是异步的,这意味着在时间 t stream2 中的记录在时间 t'<=t 出现在 stream1 中。我正在使用 window() 将 stream1 记录缓存 1-2 小时,但如果对过去批处理的 stream1 的操作将在每个新批处理中执行,这可能效率低下。

最佳答案

首先,是的,window() 通过调用 persist 来缓存 dStream。在这里缓存是指数据保存在内存中。默认存储级别为 StorageLevel.MEMORY_ONLY_SER 即

Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

窗口转换的作用是返回一个新的 DStream,其中每个 RDD 包含在该 DStream 上的滑动时间窗口中看到的所有元素。在内部,它创建了一个 WindowedDStream 对象,该对象调用 persist() 来缓存 DStream,并且根据 Spark API 文档,“它默认在父级别持久存在,因为这些 RDD 显然将被重用。”

因此,您不能依赖 Window 来缓存 DStreams。如果您想减少转换,您应该在该 DStream 和其他转换之前调用 persist()。

在您的情况下,尝试调用 persist,如下所示:

cdr_orig.persist(StorageLevel.MEMORY_AND_DISK);

在进行 mapToPair 转换之前。您会看到一个更紧凑的 DAG 将形成,顶部有绿点。

关于apache-spark - Spark Streaming - window() 是否缓存 DStreams?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37494411/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com