如何在 Spark Streaming 上保持固定大小
How to persist a fixed size on spark streaming
我正在尝试使用 Spark 中的持久化功能将数据持久化到内存中并对其进行计算。
就我而言,我只想保留 2GB DStream。这是我的代码:
val conf = new SparkConf()
.setAppName("File Count")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val file = ssc.textFileStream("hdfs://192.168.1.31:8020/user/sparkStreaming/input")
var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
val windowed = test.reduceByKey(((a:Int,b:Int) => (a + b)))
windowed.persist(MEMORY_ONLY_SER)
当我达到2GB时,我再做一次处理,我使用unpersist来释放内存。
谁能帮我看看我坚持了多少?
如果我知道我坚持了多少,我如何将它用作传导((如果 PERSISTED == 2 GB)做治疗)?
在Spark Streaming中,persist
或cache
不会为每批数据不断地向内存中添加数据。它所做的是表明底层 RDD 应该被缓存,以便对它的进一步操作应用于 RDD 的记忆计算。所以,这不是一个累积的过程。
如果你想积累数据,你可以这样做:
var addedRDD = sparkContext.emptyRDD
...
dstream.foreachRDD{ rdd =>
addedRDD = addedRDD union rdd
addedRDD.cache()
}
这最终会给你带来麻烦,因为这个 RDD 的大小和复杂性会增加(我鼓励你在几次迭代后在 Spark UI 中检查它)
要获得使用的内存,你应该使用metrics interface。我猜你正在寻找 BlockManager.memory.memUsed_MB
但我可能错了。
也就是说,依靠 JVM 内存指标来触发某些工作对我来说似乎不是一个好主意,因为内存大小取决于用于保存数据的内部结构,并且不会准确反映数据的实际大小.
我宁愿根据 record count x record size
.
计算指标
一旦我们有了触发指标,我们就可以 'purge' 使用该条件收集的数据:
var addedRDD = sparkContext.emptyRDD
...
dstream.foreachRDD{ rdd =>
addedRDD = addedRDD union rdd
addedRDD.cache()
if (addedRDD.count() > SomeSize) {
writeToStorage(addedRDD)
addedRDD.unpersist(true)
addedRDD=sparkContext.emptyRDD()
}
}
(*) 代码仅供参考。
我正在尝试使用 Spark 中的持久化功能将数据持久化到内存中并对其进行计算。
就我而言,我只想保留 2GB DStream。这是我的代码:
val conf = new SparkConf()
.setAppName("File Count")
.setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val file = ssc.textFileStream("hdfs://192.168.1.31:8020/user/sparkStreaming/input")
var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
val windowed = test.reduceByKey(((a:Int,b:Int) => (a + b)))
windowed.persist(MEMORY_ONLY_SER)
当我达到2GB时,我再做一次处理,我使用unpersist来释放内存。
谁能帮我看看我坚持了多少?
如果我知道我坚持了多少,我如何将它用作传导((如果 PERSISTED == 2 GB)做治疗)?
在Spark Streaming中,persist
或cache
不会为每批数据不断地向内存中添加数据。它所做的是表明底层 RDD 应该被缓存,以便对它的进一步操作应用于 RDD 的记忆计算。所以,这不是一个累积的过程。
如果你想积累数据,你可以这样做:
var addedRDD = sparkContext.emptyRDD
...
dstream.foreachRDD{ rdd =>
addedRDD = addedRDD union rdd
addedRDD.cache()
}
这最终会给你带来麻烦,因为这个 RDD 的大小和复杂性会增加(我鼓励你在几次迭代后在 Spark UI 中检查它)
要获得使用的内存,你应该使用metrics interface。我猜你正在寻找 BlockManager.memory.memUsed_MB
但我可能错了。
也就是说,依靠 JVM 内存指标来触发某些工作对我来说似乎不是一个好主意,因为内存大小取决于用于保存数据的内部结构,并且不会准确反映数据的实际大小.
我宁愿根据 record count x record size
.
一旦我们有了触发指标,我们就可以 'purge' 使用该条件收集的数据:
var addedRDD = sparkContext.emptyRDD
...
dstream.foreachRDD{ rdd =>
addedRDD = addedRDD union rdd
addedRDD.cache()
if (addedRDD.count() > SomeSize) {
writeToStorage(addedRDD)
addedRDD.unpersist(true)
addedRDD=sparkContext.emptyRDD()
}
}
(*) 代码仅供参考。