创建一个映射来为 Spark Dataframe 的每一行调用 POJO

Create a map to call the POJO for each row of Spark Dataframe

我在 R 中构建了一个 H2O 模型并保存了 POJO 代码。我想使用 POJO 在 hdfs 中对镶木地板文件进行评分,但我不确定如何去做。我计划将 parquet 文件读入 spark (scala/SparkR/PySpark) 并在那里对其进行评分。以下是我在 H2O's documentation page.

上找到的摘录

"How do I run a POJO on a Spark Cluster?

The POJO provides just the math logic to do predictions, so you won’t find any Spark (or even H2O) specific code there. If you want to use the POJO to make predictions on a dataset in Spark, create a map to call the POJO for each row and save the result to a new column, row-by-row"

有没有人有一些示例代码可以说明我如何做到这一点?我将不胜感激任何帮助。我主要使用 R 和 SparkR 编写代码,但不确定如何将 POJO“映射”到每一行。

提前致谢。

如果你想在 spark 中使用 POJO 或 MOJO 进行评分,你应该使用 h2o-genmodel.jar class 中提供的 RowData 作为逐行输入数据来调用 easyPredict 方法来生成分数。

您的解决方案是从 HDFS 读取镶木地板文件,然后对于每一行,通过填充每个条目将其转换为 RowData 对象,然后将其传递给您的 POJO 评分函数。记住 POJO 和 MOJO 它们都使用完全相同的评分函数来评分,唯一的区别在于 POJO Class 的使用方式与 MOJO 资源 zip 包的使用方式。由于 MOJO 向后兼容并且可以与任何更新的 h2o-genmodel.jar 一起使用,因此最好使用 MOJO 而不是 POJO。

以下是完整的 Scala 代码,您可以在 Spark 上使用它来加载 MOJO 模型,然后进行评分:

import _root_.hex.genmodel.GenModel
import _root_.hex.genmodel.easy.{EasyPredictModelWrapper, RowData}
import _root_.hex.genmodel.easy.prediction
import _root_.hex.genmodel.MojoModel
import _root_.hex.genmodel.easy.RowData

// Load Mojo
val mojo = MojoModel.load("/Users/avkashchauhan/learn/customers/mojo_bin/gbm_model.zip")
val easyModel = new EasyPredictModelWrapper(mojo)

// Get Mojo Details
var features = mojo.getNames.toBuffer

// Creating the row
val r = new RowData
r.put("AGE", "68")
r.put("RACE", "2")
r.put("DCAPS", "2")
r.put("VOL", "0")
r.put("GLEASON", "6")

// Performing the Prediction
val prediction = easyModel.predictBinomial(r).classProbabilities 

Here 是在 Spark 中读取 parquet 文件然后保存为 CSV 的示例。您可以使用相同的代码从 HDFS 读取镶木地板,然后将每一行作为 RowData 传递给上面的示例。

Here是在spark中使用MOJO模型并使用RowData进行评分的详细示例。

我刚刚 post 编辑了一个实际使用 DataFrame/Dataset 的 solution。 post 使用星球大战数据集在 R 中构建模型,然后在 Spark 的测试集上对 MOJO 进行评分。我将在此处粘贴唯一相关的部分:

使用 Spark(和 Scala)评分

您可以使用 spark-submit 或 spark-shell。如果使用 spark-submit,h2o-genmodel.jar 需要放在 spark 应用程序根目录下的 lib 文件夹下,以便在编译时将其添加为依赖项。以下代码假定您是 运行 spark-shell。为了使用 h2o-genmodel.jar,您需要在启动 spark-shell 时通过提供 --jar 标志附加 jar 文件。例如:

/usr/lib/spark/bin/spark-shell \
--conf spark.serializer="org.apache.spark.serializer.KryoSerializer" \
--conf spark.driver.memory="3g" \
--conf spark.executor.memory="10g" \
--conf spark.executor.instances=10 \
--conf spark.executor.cores=4 \
--jars /path/to/h2o-genmodel.jar

现在在 Spark shell 中导入依赖项

import _root_.hex.genmodel.easy.{EasyPredictModelWrapper, RowData}
import _root_.hex.genmodel.MojoModel

使用 DataFrame

val modelPath = "/path/to/zip/file"
val dataPath = "/path/to/test/data"

// Import data
val dfStarWars = spark.read.option("header", "true").csv(dataPath)
// Import MOJO model
val mojo = MojoModel.load(modelPath)
val easyModel = new EasyPredictModelWrapper(mojo)

// score
val dfScore = dfStarWars.map {
  x =>
    val r = new RowData
    r.put("height", x.getAs[String](1))
    r.put("mass", x.getAs[String](2))
    val score = easyModel.predictBinomial(r).classProbabilities
    (x.getAs[String](0), score(1))
}.toDF("name", "isHumanScore")

变量 score 是级别 0 和 1 的两个分数的列表。score(1) 是级别 1 的分数,即 "human"。默认情况下,映射函数 returns 具有未指定列名“_1”、“_2”等的 DataFrame。您可以通过调用 toDF 重命名列。

使用数据集

要使用数据集 API 我们只需要创建两种情况 classes,一种用于输入数据,另一种用于输出。

case class StarWars (
  name: String,
  height: String,
  mass: String,
  is_human: String
)

case class Score (
  name: String,
  isHumanScore: Double
)


// Dataset
val dtStarWars = dfStarWars.as[StarWars]
val dtScore = dtStarWars.map {
  x =>
    val r = new RowData
    r.put("height", x.height)
    r.put("mass", x.mass)
    val score = easyModel.predictBinomial(r).classProbabilities
    Score(x.name, score(1))
}

使用数据集,您可以通过直接调用 x.columnName 来获取列的值。请注意,列值的类型必须是字符串,因此如果它们是 class.

案例中定义的其他类型,您可能需要手动转换它们