Apache Spark:在传入 DStreams/DataFrames 上应用现有的 mllib 模型
Apache Spark: Apply existing mllib model on Incoming DStreams/DataFrames
使用 Apache Spark 的 mllib,我有一个存储在 HDFS 中的逻辑回归模型。此逻辑回归模型是根据来自某些传感器的历史数据进行训练的。
我有另一个 spark 程序使用来自这些传感器的流数据。我希望能够使用预先存在的训练模型对传入的数据流进行预测。注意:我不希望我的模型被这些数据更新。
要加载训练模型,我必须在我的代码中使用以下行:
val logisticModel = LogisticRegressionModel.load(sc, <location of model>)
sc:火花上下文。
但是,此应用程序是流式应用程序,因此已经有 "StreamingContext" 设置。现在,根据我的阅读,在同一个程序中有两个上下文是不好的做法(即使这是可能的)。
这是否意味着我的方法是错误的,我不能做我想做的事?
此外,如果我继续将流数据存储在一个文件中并对其进行 运行 逻辑回归而不是尝试直接在流应用程序中执行它是否更有意义?
StreamingContext
可以通过几种方式创建,包括两个采用现有 SparkContext
:
的构造函数
StreamingContext(path: String, sparkContext: SparkContext)
- 其中 path
是检查点文件的路径
StreamingContext(sparkContext: SparkContext, batchDuration: Duration)
因此您可以简单地创建 SparkContext
,加载所需的模型,然后创建 StreamingContext
:
val sc: SparkContext = ???
...
val ssc = new StreamingContext(sc, Seconds(1))
您还可以使用StreamingContext.sparkContext
方法获得SparkContext
:
val ssc: StreamingContext = ???
ssc.sparkContext: SparkContext
使用 Apache Spark 的 mllib,我有一个存储在 HDFS 中的逻辑回归模型。此逻辑回归模型是根据来自某些传感器的历史数据进行训练的。
我有另一个 spark 程序使用来自这些传感器的流数据。我希望能够使用预先存在的训练模型对传入的数据流进行预测。注意:我不希望我的模型被这些数据更新。
要加载训练模型,我必须在我的代码中使用以下行:
val logisticModel = LogisticRegressionModel.load(sc, <location of model>)
sc:火花上下文。
但是,此应用程序是流式应用程序,因此已经有 "StreamingContext" 设置。现在,根据我的阅读,在同一个程序中有两个上下文是不好的做法(即使这是可能的)。
这是否意味着我的方法是错误的,我不能做我想做的事?
此外,如果我继续将流数据存储在一个文件中并对其进行 运行 逻辑回归而不是尝试直接在流应用程序中执行它是否更有意义?
StreamingContext
可以通过几种方式创建,包括两个采用现有 SparkContext
:
StreamingContext(path: String, sparkContext: SparkContext)
- 其中path
是检查点文件的路径StreamingContext(sparkContext: SparkContext, batchDuration: Duration)
因此您可以简单地创建 SparkContext
,加载所需的模型,然后创建 StreamingContext
:
val sc: SparkContext = ???
...
val ssc = new StreamingContext(sc, Seconds(1))
您还可以使用StreamingContext.sparkContext
方法获得SparkContext
:
val ssc: StreamingContext = ???
ssc.sparkContext: SparkContext