Spark ML - 从新数据元素创建一个特征向量来预测
Spark ML - create a features vector from new data element to predict on
tl;博士
我在 Spark 2.10 中拟合了一个 LinearRegression 模型 - 在使用 StringIndexer 和 OneHotEncoder 之后,我有一个约 44 个元素的特征向量。对于我想要预测的新数据位,如何从新数据元素创建特征向量?
更多详情
首先,这是一个完全人为的例子来学习如何做到这一点。使用包含以下字段的日志:
"elapsed_time", "api_name", "method", and "status_code"
我们将创建一个标签模型 elapsed_time
并将其他字段用作我们的特征集。下面分享完整代码
步骤 - 压缩
- 将我们的数据读入 DataFrame
- 使用 StringIndexer 索引我们的每个特征
- OneHotEncode 使用 OneHotEncoder 索引特征
- 使用 VectorAssembler 创建我们的特征向量
- 将数据拆分为训练集和测试集
- 拟合模型并预测测试数据
结果很糟糕,但正如我所说,这是一个人为的练习...
我需要学习如何做
例如,如果一个新的日志条目进入流式应用程序,我将如何从新数据创建一个特征向量并将其传递给 predict()?
新的日志条目可能是:
{api_name":"/sample_api_1/v2","method":"GET","status_code":"200","elapsed_time":39 }
Post VectorAssembler
status_code_vector
(14,[0],[1.0])
api_name_vector
(27,[0],[1.0])
method_vector
(3,[0],[1.0])
特征向量
(44,[0,14,41],[1.0,1.0,1.0])
乐码
%spark
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler, StringIndexerModel, VectorSlicer}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.DataFrame
val logs = sc.textFile("/Users/z001vmk/data/sample_102M.txt")
val dfLogsRaw: DataFrame = spark.read.json(logs)
val dfLogsFiltered = dfLogsRaw.filter("status_code != 314").drop("extra_column")
// Create DF with our fields of concern.
val dfFeatures: DataFrame = dfLogsFiltered.select("elapsed_time", "api_name", "method", "status_code")
// Contrived goal:
// Use elapsed time as our label given features api_name, status_code, & method.
// Train model on small (100Mb) dataset
// Be able to predict elapsed_time given a new record similar to this example:
// --> {api_name":"/sample_api_1/v2","method":"GET","status_code":"200","elapsed_time":39}
// Indexers
val statusCodeIdxr: StringIndexer = new StringIndexer().setInputCol("status_code").setOutputCol("status_code_idx").setHandleInvalid("skip")
val apiNameIdxr: StringIndexer = new StringIndexer().setInputCol("api_name").setOutputCol("api_name_idx").setHandleInvalid("skip")
val methodIdxr: StringIndexer = new StringIndexer().setInputCol("method").setOutputCol("method_idx").setHandleInvalid("skip")
// Index features:
val dfIndexed0: DataFrame = statusCodeIdxr.fit(dfFeatures).transform(dfFeatures)
val dfIndexed1: DataFrame = apiNameIdxr.fit(dfIndexed0).transform(dfIndexed0)
val indexed: DataFrame = methodIdxr.fit(dfIndexed1).transform(dfIndexed1)
// OneHotEncoders
val statusCodeEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(statusCodeIdxr.getOutputCol).setOutputCol("status_code_vec")
val apiNameEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(apiNameIdxr.getOutputCol).setOutputCol("api_name_vec")
val methodEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(methodIdxr.getOutputCol).setOutputCol("method_vec")
// Encode feature vectors
val encoded0: DataFrame = statusCodeEncoder.transform(indexed)
val encoded1: DataFrame = apiNameEncoder.transform(encoded0)
val encoded: DataFrame = methodEncoder.transform(encoded1)
// Limit our dataset to necessary elements:
val dataset0 = encoded.select("elapsed_time", "status_code_vec", "api_name_vec", "method_vec").withColumnRenamed("elapsed_time", "label")
// Assemble feature vectors
val assembler: VectorAssembler = new VectorAssembler().setInputCols(Array("status_code_vec", "api_name_vec", "method_vec")).setOutputCol("features")
val dataset1 = assembler.transform(dataset0)
dataset1.show(5,false)
// Prepare the dataset for training (optional):
val dataset: DataFrame = dataset1.select("label", "features")
dataset.show(3,false)
val Array(training, test) = dataset.randomSplit(Array(0.8, 0.2))
// Create our Linear Regression Model
val lr: LinearRegression = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("label").setFeaturesCol("features")
val lrModel = lr.fit(training)
val predictions = lrModel.transform(test)
predictions.show(20,false)
如果您有兴趣,可以将这些全部粘贴到 Zeppelin 笔记本中。
总结
所以,我一直在寻找的是如何将新数据转换为大约 35 个元素的特征向量,并使用适合训练数据的模型对其进行转换并获得预测。我怀疑元数据要么保存在模型本身中,要么在这种情况下需要从 StringIndexer 维护 - 但这是我找不到的。
很高兴被指向文档或示例 - 所有帮助表示赞赏。
谢谢!
简答:管道模型。
不过,为了确保您理解,您不希望在启动应用程序时创建您的模型,如果没有必要的话。除非您打算使用数据集和反馈,否则这很愚蠢。在 Spark 提交会话(或使用 Zeppelin 之类的笔记本会话)中创建模型并将其保存下来。那就是你的数据科学。
大多数 DS 人员将模型交给 DevOps/Data 工程师使用。他们所要做的就是在对象加载到内存后调用 .predict()。
在使用 PipelineModel 之后,这变得非常简单。向@tadamhicks 致敬,感谢他让我尽早了解管道。
下面是一个更新的代码块,它执行与上面基本相同的模型创建、拟合和预测,但使用管道执行此操作,并添加了一个位,我们在新创建的 DataFrame 上进行预测,以模拟如何预测新的 DataFrame数据.
rename/create 我们的标签列可能有更简洁的方法,但我们会将其留作未来的增强。
%spark
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler, StringIndexerModel, VectorSlicer}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.DataFrame
val logs = sc.textFile("/data/sample_102M.txt")
val dfLogsRaw: DataFrame = spark.read.json(logs)
val dfLogsFiltered = dfLogsRaw.filter("status_code != 314").drop("extra_column")
.select("elapsed_time", "api_name", "method", "status_code","cache_status")
.withColumnRenamed("elapsed_time", "label")
val Array(training, test) = dfLogsFiltered.randomSplit(Array(0.8, 0.2))
// Indexers
val statusCodeIdxr: StringIndexer = new StringIndexer().setInputCol("status_code").setOutputCol("status_code_idx").setHandleInvalid("skip")
val apiNameIdxr: StringIndexer = new StringIndexer().setInputCol("api_name").setOutputCol("api_name_idx").setHandleInvalid("skip")
val methodIdxr: StringIndexer = new StringIndexer().setInputCol("method").setOutputCol("method_idx").setHandleInvalid("skip")//"cache_status"
val cacheStatusIdxr: StringIndexer = new StringIndexer().setInputCol("cache_status").setOutputCol("cache_status_idx").setHandleInvalid("skip")
// OneHotEncoders
val statusCodeEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(statusCodeIdxr.getOutputCol).setOutputCol("status_code_vec")
val apiNameEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(apiNameIdxr.getOutputCol).setOutputCol("api_name_vec")
val methodEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(methodIdxr.getOutputCol).setOutputCol("method_vec")
val cacheStatusEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(cacheStatusIdxr.getOutputCol).setOutputCol("cache_status_vec")
// Vector Assembler
val assembler: VectorAssembler = new VectorAssembler().setInputCols(Array("status_code_vec", "api_name_vec", "method_vec", "cache_status_vec")).setOutputCol("features")
val lr: LinearRegression = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("label").setFeaturesCol("features")
val pipeline = new Pipeline().setStages(Array(statusCodeIdxr, apiNameIdxr, methodIdxr, cacheStatusIdxr, statusCodeEncoder, apiNameEncoder, methodEncoder, cacheStatusEncoder, assembler, lr))
val plModel: PipelineModel = pipeline.fit(training)
plModel.write.overwrite().save("/tmp/spark-linear-regression-model")
plModel.transform(test).select("label", "prediction").show(5,false)
val dataElement: String = """{"api_name":"/sample_api/v2","method":"GET","status_code":"200","cache_status":"MISS","elapsed_time":39}"""
val newDataRDD = spark.sparkContext.makeRDD(dataElement :: Nil)
val newData = spark.read.json(newDataRDD).withColumnRenamed("elapsed_time", "label")
val loadedPlModel = PipelineModel.load("/tmp/spark-linear-regression-model")
loadedPlModel.transform(newData).select("label", "prediction").show
tl;博士
我在 Spark 2.10 中拟合了一个 LinearRegression 模型 - 在使用 StringIndexer 和 OneHotEncoder 之后,我有一个约 44 个元素的特征向量。对于我想要预测的新数据位,如何从新数据元素创建特征向量?
更多详情
首先,这是一个完全人为的例子来学习如何做到这一点。使用包含以下字段的日志:
"elapsed_time", "api_name", "method", and "status_code"
我们将创建一个标签模型 elapsed_time
并将其他字段用作我们的特征集。下面分享完整代码
步骤 - 压缩
- 将我们的数据读入 DataFrame
- 使用 StringIndexer 索引我们的每个特征
- OneHotEncode 使用 OneHotEncoder 索引特征
- 使用 VectorAssembler 创建我们的特征向量
- 将数据拆分为训练集和测试集
- 拟合模型并预测测试数据
结果很糟糕,但正如我所说,这是一个人为的练习...
我需要学习如何做
例如,如果一个新的日志条目进入流式应用程序,我将如何从新数据创建一个特征向量并将其传递给 predict()?
新的日志条目可能是:
{api_name":"/sample_api_1/v2","method":"GET","status_code":"200","elapsed_time":39 }
Post VectorAssembler
status_code_vector
(14,[0],[1.0])
api_name_vector
(27,[0],[1.0])
method_vector
(3,[0],[1.0])
特征向量
(44,[0,14,41],[1.0,1.0,1.0])
乐码
%spark
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler, StringIndexerModel, VectorSlicer}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.DataFrame
val logs = sc.textFile("/Users/z001vmk/data/sample_102M.txt")
val dfLogsRaw: DataFrame = spark.read.json(logs)
val dfLogsFiltered = dfLogsRaw.filter("status_code != 314").drop("extra_column")
// Create DF with our fields of concern.
val dfFeatures: DataFrame = dfLogsFiltered.select("elapsed_time", "api_name", "method", "status_code")
// Contrived goal:
// Use elapsed time as our label given features api_name, status_code, & method.
// Train model on small (100Mb) dataset
// Be able to predict elapsed_time given a new record similar to this example:
// --> {api_name":"/sample_api_1/v2","method":"GET","status_code":"200","elapsed_time":39}
// Indexers
val statusCodeIdxr: StringIndexer = new StringIndexer().setInputCol("status_code").setOutputCol("status_code_idx").setHandleInvalid("skip")
val apiNameIdxr: StringIndexer = new StringIndexer().setInputCol("api_name").setOutputCol("api_name_idx").setHandleInvalid("skip")
val methodIdxr: StringIndexer = new StringIndexer().setInputCol("method").setOutputCol("method_idx").setHandleInvalid("skip")
// Index features:
val dfIndexed0: DataFrame = statusCodeIdxr.fit(dfFeatures).transform(dfFeatures)
val dfIndexed1: DataFrame = apiNameIdxr.fit(dfIndexed0).transform(dfIndexed0)
val indexed: DataFrame = methodIdxr.fit(dfIndexed1).transform(dfIndexed1)
// OneHotEncoders
val statusCodeEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(statusCodeIdxr.getOutputCol).setOutputCol("status_code_vec")
val apiNameEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(apiNameIdxr.getOutputCol).setOutputCol("api_name_vec")
val methodEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(methodIdxr.getOutputCol).setOutputCol("method_vec")
// Encode feature vectors
val encoded0: DataFrame = statusCodeEncoder.transform(indexed)
val encoded1: DataFrame = apiNameEncoder.transform(encoded0)
val encoded: DataFrame = methodEncoder.transform(encoded1)
// Limit our dataset to necessary elements:
val dataset0 = encoded.select("elapsed_time", "status_code_vec", "api_name_vec", "method_vec").withColumnRenamed("elapsed_time", "label")
// Assemble feature vectors
val assembler: VectorAssembler = new VectorAssembler().setInputCols(Array("status_code_vec", "api_name_vec", "method_vec")).setOutputCol("features")
val dataset1 = assembler.transform(dataset0)
dataset1.show(5,false)
// Prepare the dataset for training (optional):
val dataset: DataFrame = dataset1.select("label", "features")
dataset.show(3,false)
val Array(training, test) = dataset.randomSplit(Array(0.8, 0.2))
// Create our Linear Regression Model
val lr: LinearRegression = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("label").setFeaturesCol("features")
val lrModel = lr.fit(training)
val predictions = lrModel.transform(test)
predictions.show(20,false)
如果您有兴趣,可以将这些全部粘贴到 Zeppelin 笔记本中。
总结
所以,我一直在寻找的是如何将新数据转换为大约 35 个元素的特征向量,并使用适合训练数据的模型对其进行转换并获得预测。我怀疑元数据要么保存在模型本身中,要么在这种情况下需要从 StringIndexer 维护 - 但这是我找不到的。
很高兴被指向文档或示例 - 所有帮助表示赞赏。
谢谢!
简答:管道模型。
不过,为了确保您理解,您不希望在启动应用程序时创建您的模型,如果没有必要的话。除非您打算使用数据集和反馈,否则这很愚蠢。在 Spark 提交会话(或使用 Zeppelin 之类的笔记本会话)中创建模型并将其保存下来。那就是你的数据科学。
大多数 DS 人员将模型交给 DevOps/Data 工程师使用。他们所要做的就是在对象加载到内存后调用 .predict()。
在使用 PipelineModel 之后,这变得非常简单。向@tadamhicks 致敬,感谢他让我尽早了解管道。
下面是一个更新的代码块,它执行与上面基本相同的模型创建、拟合和预测,但使用管道执行此操作,并添加了一个位,我们在新创建的 DataFrame 上进行预测,以模拟如何预测新的 DataFrame数据.
rename/create 我们的标签列可能有更简洁的方法,但我们会将其留作未来的增强。
%spark
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler, StringIndexerModel, VectorSlicer}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.DataFrame
val logs = sc.textFile("/data/sample_102M.txt")
val dfLogsRaw: DataFrame = spark.read.json(logs)
val dfLogsFiltered = dfLogsRaw.filter("status_code != 314").drop("extra_column")
.select("elapsed_time", "api_name", "method", "status_code","cache_status")
.withColumnRenamed("elapsed_time", "label")
val Array(training, test) = dfLogsFiltered.randomSplit(Array(0.8, 0.2))
// Indexers
val statusCodeIdxr: StringIndexer = new StringIndexer().setInputCol("status_code").setOutputCol("status_code_idx").setHandleInvalid("skip")
val apiNameIdxr: StringIndexer = new StringIndexer().setInputCol("api_name").setOutputCol("api_name_idx").setHandleInvalid("skip")
val methodIdxr: StringIndexer = new StringIndexer().setInputCol("method").setOutputCol("method_idx").setHandleInvalid("skip")//"cache_status"
val cacheStatusIdxr: StringIndexer = new StringIndexer().setInputCol("cache_status").setOutputCol("cache_status_idx").setHandleInvalid("skip")
// OneHotEncoders
val statusCodeEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(statusCodeIdxr.getOutputCol).setOutputCol("status_code_vec")
val apiNameEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(apiNameIdxr.getOutputCol).setOutputCol("api_name_vec")
val methodEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(methodIdxr.getOutputCol).setOutputCol("method_vec")
val cacheStatusEncoder: OneHotEncoder = new OneHotEncoder().setInputCol(cacheStatusIdxr.getOutputCol).setOutputCol("cache_status_vec")
// Vector Assembler
val assembler: VectorAssembler = new VectorAssembler().setInputCols(Array("status_code_vec", "api_name_vec", "method_vec", "cache_status_vec")).setOutputCol("features")
val lr: LinearRegression = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("label").setFeaturesCol("features")
val pipeline = new Pipeline().setStages(Array(statusCodeIdxr, apiNameIdxr, methodIdxr, cacheStatusIdxr, statusCodeEncoder, apiNameEncoder, methodEncoder, cacheStatusEncoder, assembler, lr))
val plModel: PipelineModel = pipeline.fit(training)
plModel.write.overwrite().save("/tmp/spark-linear-regression-model")
plModel.transform(test).select("label", "prediction").show(5,false)
val dataElement: String = """{"api_name":"/sample_api/v2","method":"GET","status_code":"200","cache_status":"MISS","elapsed_time":39}"""
val newDataRDD = spark.sparkContext.makeRDD(dataElement :: Nil)
val newData = spark.read.json(newDataRDD).withColumnRenamed("elapsed_time", "label")
val loadedPlModel = PipelineModel.load("/tmp/spark-linear-regression-model")
loadedPlModel.transform(newData).select("label", "prediction").show