如何在 Spark Streaming 上保持固定大小

How to persist a fixed size on spark streaming

我正在尝试使用 Spark 中的持久化功能将数据持久化到内存中并对其进行计算。
就我而言,我只想保留 2GB DStream。这是我的代码:

val conf = new SparkConf()
    .setAppName("File Count")
    .setMaster("local[2]")

    val sc = new SparkContext(conf)    
  val ssc = new StreamingContext(sc, Seconds(10))
  val file = ssc.textFileStream("hdfs://192.168.1.31:8020/user/sparkStreaming/input")
  var test = file.map(x => (x.split(" ")(0)+";"+x.split(" ")(1), 1)).reduceByKey((x,y) => x+y)
  val windowed = test.reduceByKey(((a:Int,b:Int) => (a + b)))
  windowed.persist(MEMORY_ONLY_SER)

当我达到2GB时,我再做一次处理,我使用unpersist来释放内存。
谁能帮我看看我坚持了多少?
如果我知道我坚持了多少,我如何将它用作传导((如果 PERSISTED == 2 GB)做治疗)?

在Spark Streaming中,persistcache不会为每批数据不断地向内存中添加数据。它所做的是表明底层 RDD 应该被缓存,以便对它的进一步操作应用于 RDD 的记忆计算。所以,这不是一个累积的过程。 如果你想积累数据,你可以这样做:

var addedRDD = sparkContext.emptyRDD
...
dstream.foreachRDD{ rdd =>
  addedRDD = addedRDD union rdd
  addedRDD.cache()
}

这最终会给你带来麻烦,因为这个 RDD 的大小和复杂性会增加(我鼓励你在几次迭代后在 Spark UI 中检查它)

要获得使用的内存,你应该使用metrics interface。我猜你正在寻找 BlockManager.memory.memUsed_MB 但我可能错了。

也就是说,依靠 JVM 内存指标来触发某些工作对我来说似乎不是一个好主意,因为内存大小取决于用于保存数据的内部结构,并且不会准确反映数据的实际大小. 我宁愿根据 record count x record size.

计算指标

一旦我们有了触发指标,我们就可以 'purge' 使用该条件收集的数据:

var addedRDD = sparkContext.emptyRDD
...
dstream.foreachRDD{ rdd =>
  addedRDD = addedRDD union rdd
  addedRDD.cache()
  if (addedRDD.count() > SomeSize) {
      writeToStorage(addedRDD)
      addedRDD.unpersist(true)
      addedRDD=sparkContext.emptyRDD()
  }
}

(*) 代码仅供参考。