可以在 Spark 批处理上创建模型并在 Spark 流中使用它吗?
Can a model be created on Spark batch and use it in Spark streaming?
我可以在 spark batch 中创建模型并在 Spark streaming 上使用它进行实时处理吗?
我在 Apache Spark 站点上看到了各种示例,其中训练和预测都基于相同类型的处理(线性回归)。
Can I create a model in spark batch and use it on Spark streaming for real-time processing?
当然可以。在 Spark 社区中,他们称之为离线训练在线预测。 spark 中的许多训练算法允许您将模型保存在文件系统 HDFS/S3 上。流应用程序可以加载相同的模型。您只需调用模型的预测方法即可进行预测。
请参阅 this link 中的 Streaming + MLLib 部分。
例如,如果您想离线训练决策树并在线进行预测...
在批量应用中 -
val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,impurity, maxDepth, maxBins)
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
在流媒体应用程序中 -
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
sameModel.predict(newData)
这是我刚刚实施的另一种解决方案。
我在 spark-Batch 中创建了一个模型。
假设最终的模型对象名称是 regmodel.
final LinearRegressionModel regmodel =algorithm.run(JavaRDD.toRDD(parsedData));
并且 spark 上下文名称是 sc as
JavaSparkContext sc = new JavaSparkContext(sparkConf);
现在在相同的代码中,我正在使用相同的 sc 创建火花流
final JavaStreamingContext jssc = new JavaStreamingContext(sc,new Duration(Integer.parseInt(conf.getWindow().trim())));
并像这样进行预测:
JavaPairDStream<Double, Double> predictvalue = dist1.mapToPair(new PairFunction<LabeledPoint, Double,Double>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Double, Double> call(LabeledPoint v1) throws Exception {
Double p = v1.label();
Double q = regmodel.predict(v1.features());
return new Tuple2<Double, Double>(p,q);
}
});
我可以在 spark batch 中创建模型并在 Spark streaming 上使用它进行实时处理吗?
我在 Apache Spark 站点上看到了各种示例,其中训练和预测都基于相同类型的处理(线性回归)。
Can I create a model in spark batch and use it on Spark streaming for real-time processing?
当然可以。在 Spark 社区中,他们称之为离线训练在线预测。 spark 中的许多训练算法允许您将模型保存在文件系统 HDFS/S3 上。流应用程序可以加载相同的模型。您只需调用模型的预测方法即可进行预测。
请参阅 this link 中的 Streaming + MLLib 部分。
例如,如果您想离线训练决策树并在线进行预测...
在批量应用中 -
val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,impurity, maxDepth, maxBins)
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
在流媒体应用程序中 -
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
sameModel.predict(newData)
这是我刚刚实施的另一种解决方案。
我在 spark-Batch 中创建了一个模型。 假设最终的模型对象名称是 regmodel.
final LinearRegressionModel regmodel =algorithm.run(JavaRDD.toRDD(parsedData));
并且 spark 上下文名称是 sc as
JavaSparkContext sc = new JavaSparkContext(sparkConf);
现在在相同的代码中,我正在使用相同的 sc 创建火花流
final JavaStreamingContext jssc = new JavaStreamingContext(sc,new Duration(Integer.parseInt(conf.getWindow().trim())));
并像这样进行预测:
JavaPairDStream<Double, Double> predictvalue = dist1.mapToPair(new PairFunction<LabeledPoint, Double,Double>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Double, Double> call(LabeledPoint v1) throws Exception {
Double p = v1.label();
Double q = regmodel.predict(v1.features());
return new Tuple2<Double, Double>(p,q);
}
});