加载 RandomForestModel 以在 Spark Streaming 中使用
Load RandomForestModel for use in Spark Streaming
我使用 Spark mllib 1.3.0
训练并保存了一个相当繁重的随机森林模型(parquet 格式)
我想将它用于 Spark 流作业的实时分类,但遇到的困难很少,主要与模型的大小和将其发送给工作人员的必要性有关。
目前,我看到 3 个解决方案,none 其中最理想的是:
在驱动程序端加载它,在每个计算的微批次上将它发送给工作人员。示例代码:
val model = RandomForestModel.load(sc, path)
stream.map(smthg => model.predict(...))
这里的问题是它在每个批次上序列化和发送模型。有关信息,我必须将 spark.akka.frameSize 设置为大于 50MB 才能无错误地执行。这显然是不可持续的
在每个工人上加载模型。在 Spark 1.3 中,似乎无法获取当前的 SparkContext,因此我必须为每个 worker 创建一个新的才能加载模型。这意味着集群需要额外的 CPU 来进行第一次微批处理/作业迭代
在驱动端执行预测:
val model = RandomForestModel.load(sc, path)
stream
.map(smthg => stuff)
.foreachRDD(rdd => model.predict(rdd))
缺点是无法很好地扩展,因为一切都发生在驱动端。
首先,最后一种方法不会在驱动程序上执行,实际上是 no different than the first one。
def predict(features: RDD[Vector]): RDD[Double] = features.map(x => predict(x))
随机森林模型不是分布式和可序列化的,因此您可以使用标准工具编写:
import java.io._
import org.apache.spark.mllib.tree.model.RandomForestModel
val model: RandomForestModel = ???
val os = new ObjectOutputStream(new FileOutputStream("/tmp/rf"))
os.writeObject(model)
os.close()
稍后阅读:
val is = new ObjectInputStream(new FileInputStream("/tmp/rf")
val model = is.readObject().asInstanceOf[RandomForestModel]
is.close
但这种方法不可移植到新的 ML API,因此在实践中,调整管道以减小模型大小更有意义。
我使用 Spark mllib 1.3.0
训练并保存了一个相当繁重的随机森林模型(parquet 格式)我想将它用于 Spark 流作业的实时分类,但遇到的困难很少,主要与模型的大小和将其发送给工作人员的必要性有关。
目前,我看到 3 个解决方案,none 其中最理想的是:
在驱动程序端加载它,在每个计算的微批次上将它发送给工作人员。示例代码:
val model = RandomForestModel.load(sc, path) stream.map(smthg => model.predict(...))
这里的问题是它在每个批次上序列化和发送模型。有关信息,我必须将 spark.akka.frameSize 设置为大于 50MB 才能无错误地执行。这显然是不可持续的
在每个工人上加载模型。在 Spark 1.3 中,似乎无法获取当前的 SparkContext,因此我必须为每个 worker 创建一个新的才能加载模型。这意味着集群需要额外的 CPU 来进行第一次微批处理/作业迭代
在驱动端执行预测:
val model = RandomForestModel.load(sc, path) stream .map(smthg => stuff) .foreachRDD(rdd => model.predict(rdd))
缺点是无法很好地扩展,因为一切都发生在驱动端。
首先,最后一种方法不会在驱动程序上执行,实际上是 no different than the first one。
def predict(features: RDD[Vector]): RDD[Double] = features.map(x => predict(x))
随机森林模型不是分布式和可序列化的,因此您可以使用标准工具编写:
import java.io._
import org.apache.spark.mllib.tree.model.RandomForestModel
val model: RandomForestModel = ???
val os = new ObjectOutputStream(new FileOutputStream("/tmp/rf"))
os.writeObject(model)
os.close()
稍后阅读:
val is = new ObjectInputStream(new FileInputStream("/tmp/rf")
val model = is.readObject().asInstanceOf[RandomForestModel]
is.close
但这种方法不可移植到新的 ML API,因此在实践中,调整管道以减小模型大小更有意义。