如何在 spark 中映射 DataFrame 以提取 RowData 并使用 h2o mojo 模型进行预测
How to map over DataFrame in spark to extract RowData and make predictions using h2o mojo model
我保存了一个 mojo 格式的 h2o 模型,现在我正在尝试加载它并使用它对新数据集 (df
) 进行预测,作为用 scala 编写的 spark 应用程序的一部分。理想情况下,我希望将一个新行附加到包含基于此模型的 class 概率的现有 DataFrame。
我可以看到如何将 mojo 应用到已经采用 RowData 格式的单个行(根据回答 ),但我不确定如何映射现有的 DataFrame 以便它在使用 mojo 模型进行预测的正确格式。我使用过一些 DataFrame,但从未使用过底层 RDD。
另外,这个模型是否应该被序列化/广播,以便可以在集群上并行进行预测,或者它是否可以作为地图的一部分提供给所有执行者?
我已经走到这一步了:
# load mojo model and create easy predict model wrapper
val mojo = MojoModel.load("loca/path/to/mojo/mojo.zip")
val easyModel = new EasyPredictModelWrapper(mojo)
# map over spark DataFrame, converty to rdd, and make predictions on each row:
df.rdd.map { row =>
val prediction = easyModel.predictBinomial(row).classProbabilities
println(prediction)
}
但是我的 row
变量的格式不正确,无法正常工作。关于下一步尝试什么的任何建议?
编辑:我的 DataFrame 由 70 个预测特征列组成,它们是整数和 category/factor 列的混合体。一个非常简单的示例 DataFrame:
val df = Seq(
(0, 3, "cat1"),
(1, 2, "cat2"),
(2, 6, "cat1")
).toDF("id", "age", "category")
使用此函数准备 H2O 所需的 RowData 对象:
def rowToRowData(df: DataFrame, row: Row): RowData = {
val rowAsMap = row.getValuesMap[Any](df.schema.fieldNames)
val rowData = rowAsMap.foldLeft(new RowData()) { case (rd, (k,v)) =>
if (v != null) { rd.put(k, v.toString) }
rd
}
rowData
}
我这里有一个完整的答案:
您可以直接在 df 上调用 map 而不是在 rdd 上。
我保存了一个 mojo 格式的 h2o 模型,现在我正在尝试加载它并使用它对新数据集 (df
) 进行预测,作为用 scala 编写的 spark 应用程序的一部分。理想情况下,我希望将一个新行附加到包含基于此模型的 class 概率的现有 DataFrame。
我可以看到如何将 mojo 应用到已经采用 RowData 格式的单个行(根据回答
另外,这个模型是否应该被序列化/广播,以便可以在集群上并行进行预测,或者它是否可以作为地图的一部分提供给所有执行者?
我已经走到这一步了:
# load mojo model and create easy predict model wrapper
val mojo = MojoModel.load("loca/path/to/mojo/mojo.zip")
val easyModel = new EasyPredictModelWrapper(mojo)
# map over spark DataFrame, converty to rdd, and make predictions on each row:
df.rdd.map { row =>
val prediction = easyModel.predictBinomial(row).classProbabilities
println(prediction)
}
但是我的 row
变量的格式不正确,无法正常工作。关于下一步尝试什么的任何建议?
编辑:我的 DataFrame 由 70 个预测特征列组成,它们是整数和 category/factor 列的混合体。一个非常简单的示例 DataFrame:
val df = Seq(
(0, 3, "cat1"),
(1, 2, "cat2"),
(2, 6, "cat1")
).toDF("id", "age", "category")
使用此函数准备 H2O 所需的 RowData 对象:
def rowToRowData(df: DataFrame, row: Row): RowData = {
val rowAsMap = row.getValuesMap[Any](df.schema.fieldNames)
val rowData = rowAsMap.foldLeft(new RowData()) { case (rd, (k,v)) =>
if (v != null) { rd.put(k, v.toString) }
rd
}
rowData
}
我这里有一个完整的答案: