在 Spark Streaming 中缓存 DStream

Caching DStream in Spark Streaming

我有一个从 kafka 读取数据的 Spark 流式进程, 进入 DStream。

在我的管道中我做了 两次(一个接一个):

DStream.foreachRDD( transformations on RDD and inserting into destination).

(每次做不同的处理,插入数据到不同的目的地)

我想知道 DStream.cache 在我从 Kafka 读取数据后如何工作?可以吗?

进程现在是否真的从Kafka读取了两次数据?

请记住,不可能将两个 foreachRDD 合二为一(因为两条路径完全不同,那里有状态转换 - 需要在 DStream 上应用...)

感谢您的帮助

有两个选项:

  • 使用Dstream.cache() 将底层 RDD 标记为已缓存。 Spark Streaming 将在超时后处理取消持久化 RDD,由 spark.cleaner.ttl 配置控制。

  • 使用额外的 foreachRDDcache()unpersist(false) 副作用操作应用于 DStream 中的 RDD:

例如:

val kafkaDStream = ???
val targetRDD = kafkaRDD
                       .transformation(...)
                       .transformation(...)
                       ...
// Right before the lineage fork mark the RDD as cacheable:
targetRDD.foreachRDD{rdd => rdd.cache(...)}
targetRDD.foreachRDD{do stuff 1}
targetRDD.foreachRDD{do stuff 2}
targetRDD.foreachRDD{rdd => rdd.unpersist(false)}

请注意,如果可以的话,您可以将缓存合并为 do stuff 1 的第一条语句。

我更喜欢此选项,因为它让我可以对缓存生命周期进行细粒度控制,并让我在需要时立即清理内容,而不是依赖于 ttl。