Spark Streaming - window() 是否缓存 DStream?

Spark Streaming - Is window() caching DStreams?

谁能解释一下 Spark Streaming 如何执行 window() 操作?从 Spark 1.6.1 文档来看,windowed 批处理似乎自动缓存在内存中,但查看网络 UI 似乎已在先前批处理中执行的操作再次执行。为了您的方便,我在下面附上了我的 运行 应用程序的屏幕截图:

通过查看网络 UI,似乎缓存了 flatMapValues() RDD(绿点 - 这是我在 DStream 上调用 window() 之前执行的最后一个操作),但与此同时,似乎所有导致前几批 flatMapValues() 的转换都再次执行。如果是这种情况,window() 操作可能会导致巨大的性能损失,特别是如果 window 持续时间为 1 或 2 小时(正如我对我的应用程序所期望的那样)。您认为当时检查 DStream 会有帮助吗?考虑到预期的幻灯片 window 大约是 5 分钟。

希望有人能澄清这一点。

编辑

我添加了一个代码片段。 Stream1 和 Stream2 是从 HDFS

读取的数据源
JavaPairDStream<String, String> stream1 = cdr_orig.mapToPair(parserFunc)
                                                 .flatMapValues(new Function<String, Iterable<String>>() {
                                                   @Override
                                                   public Iterable<String> call(String s) {
                                                     return Arrays.asList(s.split(","));
                                                   }
                                                 }).window(Durations.seconds(WINDOW_DURATION), Durations.seconds(SLIDE_DURATION));


JavaPairDStream<String, String> join = stream2.join(stream1);

这两个流是由另一个系统定期生成的。这些流是异步的,这意味着在时间 t stream2 中的记录在时间 t'<=t 出现在 stream1 中。我正在使用 window() 将 stream1 记录缓存 1-2 小时,但如果对过去批次的 stream1 的操作将在每个新批次中执行,这可能效率低下。

首先是 window() 通过调用 persist 来缓存 dStream。在这里缓存是指数据保存在内存中。默认存储级别为 StorageLevel.MEMORY_ONLY_SER 即

Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

window 转换所做的是 returns 一个新的 DStream,其中每个 RDD 包含在该 DStream 上的滑动 window 时间中看到的所有元素。在内部,它创建一个 WindowedDStream 对象,它调用 persist() 来缓存 DStream 并且根据 Spark API 文档 "it persists at parent level by default, as those RDDs are going to be obviously reused."

因此,您不能依赖 Window 来缓存 DStream。如果你想减少转换,你应该在之前的那个 DStreams 上调用 persist() 和其他转换。

在你的情况下,尝试调用 persist,如下所示:

cdr_orig.persist(StorageLevel.MEMORY_AND_DISK);

在进行 mapToPair 转换之前。你会看到一个更紧凑的 DAG 将形成,顶部有绿点。