在 Spark Streaming 中缓存 DStream
Caching DStream in Spark Streaming
我有一个从 kafka 读取数据的 Spark 流式进程,
进入 DStream。
在我的管道中我做了 两次(一个接一个):
DStream.foreachRDD( transformations on RDD and inserting into destination).
(每次做不同的处理,插入数据到不同的目的地)
我想知道 DStream.cache 在我从 Kafka 读取数据后如何工作?可以吗?
进程现在是否真的从Kafka读取了两次数据?
请记住,不可能将两个 foreachRDD 合二为一(因为两条路径完全不同,那里有状态转换 - 需要在 DStream 上应用...)
感谢您的帮助
有两个选项:
使用Dstream.cache()
将底层 RDD 标记为已缓存。 Spark Streaming 将在超时后处理取消持久化 RDD,由 spark.cleaner.ttl
配置控制。
使用额外的 foreachRDD
将 cache()
和 unpersist(false)
副作用操作应用于 DStream 中的 RDD:
例如:
val kafkaDStream = ???
val targetRDD = kafkaRDD
.transformation(...)
.transformation(...)
...
// Right before the lineage fork mark the RDD as cacheable:
targetRDD.foreachRDD{rdd => rdd.cache(...)}
targetRDD.foreachRDD{do stuff 1}
targetRDD.foreachRDD{do stuff 2}
targetRDD.foreachRDD{rdd => rdd.unpersist(false)}
请注意,如果可以的话,您可以将缓存合并为 do stuff 1
的第一条语句。
我更喜欢此选项,因为它让我可以对缓存生命周期进行细粒度控制,并让我在需要时立即清理内容,而不是依赖于 ttl。
我有一个从 kafka 读取数据的 Spark 流式进程, 进入 DStream。
在我的管道中我做了 两次(一个接一个):
DStream.foreachRDD( transformations on RDD and inserting into destination).
(每次做不同的处理,插入数据到不同的目的地)
我想知道 DStream.cache 在我从 Kafka 读取数据后如何工作?可以吗?
进程现在是否真的从Kafka读取了两次数据?
请记住,不可能将两个 foreachRDD 合二为一(因为两条路径完全不同,那里有状态转换 - 需要在 DStream 上应用...)
感谢您的帮助
有两个选项:
使用
Dstream.cache()
将底层 RDD 标记为已缓存。 Spark Streaming 将在超时后处理取消持久化 RDD,由spark.cleaner.ttl
配置控制。使用额外的
foreachRDD
将cache()
和unpersist(false)
副作用操作应用于 DStream 中的 RDD:
例如:
val kafkaDStream = ???
val targetRDD = kafkaRDD
.transformation(...)
.transformation(...)
...
// Right before the lineage fork mark the RDD as cacheable:
targetRDD.foreachRDD{rdd => rdd.cache(...)}
targetRDD.foreachRDD{do stuff 1}
targetRDD.foreachRDD{do stuff 2}
targetRDD.foreachRDD{rdd => rdd.unpersist(false)}
请注意,如果可以的话,您可以将缓存合并为 do stuff 1
的第一条语句。
我更喜欢此选项,因为它让我可以对缓存生命周期进行细粒度控制,并让我在需要时立即清理内容,而不是依赖于 ttl。