Spark Streaming:如何定期刷新缓存的 RDD?
Spark Streaming: How to periodically refresh cached RDD?
在我的 Spark 流应用程序中,我想映射一个基于从后端 (ElasticSearch) 检索的字典的值。我想定期刷新字典,以防它在后端更新。它类似于 Logstash 翻译过滤器的定期刷新功能。我如何使用 Spark 实现这一点(例如,以某种方式每 30 秒取消一次 RDD)?
我发现最好的方法是重新创建 RDD 并维护对它的可变引用。 Spark Streaming 的核心是一个基于 Spark 的调度框架。我们可以借助调度器定期刷新 RDD。为此,我们使用仅为刷新操作安排的空 DStream:
def getData():RDD[Data] = ??? function to create the RDD we want to use af reference data
val dstream = ??? // our data stream
// a dstream of empty data
val refreshDstream = new ConstantInputDStream(ssc, sparkContext.parallelize(Seq())).window(Seconds(refreshInterval),Seconds(refreshInterval))
var referenceData = getData()
referenceData.cache()
refreshDstream.foreachRDD{_ =>
// evict the old RDD from memory and recreate it
referenceData.unpersist(true)
referenceData = getData()
referenceData.cache()
}
val myBusinessData = dstream.transform(rdd => rdd.join(referenceData))
... etc ...
过去,我也只尝试过 cache()
和 unpersist()
的交错,但没有结果(只刷新一次)。重新创建 RDD 会删除所有沿袭并提供新数据的干净加载。
步骤:
- 开始流式传输前缓存一次
- 一段时间后清除缓存(此处示例为 30 分钟)
可选:Hive table 可以将通过 spark 修复添加到 init。
spark.sql("msck repair table tableName")
import java.time.LocalDateTime
var caching_data = Data.init()
caching_data.persist()
var currTime = LocalDateTime.now()
var cacheClearTime = currTime.plusMinutes(30) // Put your time in Units
DStream.foreachRDD(rdd => if (rdd.take(1).length > 0) {
//Clear and Cache again
currTime = LocalDateTime.now()
val dateDiff = cacheClearTime.isBefore(currTime)
if (dateDiff) {
caching_data.unpersist(true) //blocking unpersist on boolean = true
caching_data = Data.init()
caching_data.persist()
currTime = LocalDateTime.now()
cacheClearTime = currTime.plusMinutes(30)
}
})
在我的 Spark 流应用程序中,我想映射一个基于从后端 (ElasticSearch) 检索的字典的值。我想定期刷新字典,以防它在后端更新。它类似于 Logstash 翻译过滤器的定期刷新功能。我如何使用 Spark 实现这一点(例如,以某种方式每 30 秒取消一次 RDD)?
我发现最好的方法是重新创建 RDD 并维护对它的可变引用。 Spark Streaming 的核心是一个基于 Spark 的调度框架。我们可以借助调度器定期刷新 RDD。为此,我们使用仅为刷新操作安排的空 DStream:
def getData():RDD[Data] = ??? function to create the RDD we want to use af reference data
val dstream = ??? // our data stream
// a dstream of empty data
val refreshDstream = new ConstantInputDStream(ssc, sparkContext.parallelize(Seq())).window(Seconds(refreshInterval),Seconds(refreshInterval))
var referenceData = getData()
referenceData.cache()
refreshDstream.foreachRDD{_ =>
// evict the old RDD from memory and recreate it
referenceData.unpersist(true)
referenceData = getData()
referenceData.cache()
}
val myBusinessData = dstream.transform(rdd => rdd.join(referenceData))
... etc ...
过去,我也只尝试过 cache()
和 unpersist()
的交错,但没有结果(只刷新一次)。重新创建 RDD 会删除所有沿袭并提供新数据的干净加载。
步骤:
- 开始流式传输前缓存一次
- 一段时间后清除缓存(此处示例为 30 分钟)
可选:Hive table 可以将通过 spark 修复添加到 init。
spark.sql("msck repair table tableName")
import java.time.LocalDateTime
var caching_data = Data.init()
caching_data.persist()
var currTime = LocalDateTime.now()
var cacheClearTime = currTime.plusMinutes(30) // Put your time in Units
DStream.foreachRDD(rdd => if (rdd.take(1).length > 0) {
//Clear and Cache again
currTime = LocalDateTime.now()
val dateDiff = cacheClearTime.isBefore(currTime)
if (dateDiff) {
caching_data.unpersist(true) //blocking unpersist on boolean = true
caching_data = Data.init()
caching_data.persist()
currTime = LocalDateTime.now()
cacheClearTime = currTime.plusMinutes(30)
}
})