使用 Java 在 spark 2.0.2 中为 kyro 编码数据集构建决策树管道

Building decision tree pipeline for kyro-encoded Datasets in spark 2.0.2 with Java

我正在尝试从 Spark 2.0.2 org.apache.spark.examples.ml.JavaDecisionTreeClassificationExample 构建决策树分类示例的一个版本。我不能直接使用它,因为它使用 libsvm 编码的数据。我需要避免使用 libsvm(未记录的 AFAIK)来更轻松地对普通数据集进行分类。我正在尝试修改示例以改为使用 kyro 编码数据集。

问题源于下面的地图调用,特别是按照 SparkML feature vectors and Spark 2.0.2 Encoders in Java

的指示使用 Encoders.kyro 作为编码器的后果
    public SMLDecisionTree(Dataset<Row> incomingDS, final String label, final String[] features)
{
    this.incomingDS = incomingDS;
    this.label = label;
    this.features = features;
    this.mapSet = new StringToDoubleMapperSet(features);

    this.sdlDS = incomingDS
            .select(label, features)
            .filter(new FilterFunction<Row>()
            {
                public boolean call(Row row) throws Exception
                {
                    return !row.getString(0).equals(features[0]); // header
                }
            })
            .map(new MapFunction<Row, LabeledFeatureVector>()
            {
                public LabeledFeatureVector call(Row row) throws Exception
                {
                    double labelVal = mapSet.addValue(0, row.getString(0));
                    double[] featureVals = new double[features.length];
                    for (int i = 1; i < row.length(); i++)
                    {
                        Double val = mapSet.addValue(i, row.getString(i));
                        featureVals[i - 1] = val;
                    }
                    return new LabeledFeatureVector(labelVal, Vectors.dense(featureVals));
                }
                // 
            }, Encoders.kryo(LabeledFeatureVector.class));

    Dataset<LabeledFeatureVector>[] splits = sdlDS.randomSplit(new double[] { 0.7, 0.3 });
    this.trainingDS = splits[0];
    this.testDS = splits[1];
}

这会影响原始 spark 示例中的 StringIndexer 和 VectorIndexer,它们无法处理生成的 kyro 编码数据集。这是从 spark 决策树示例代码中获取的管道构建代码​​:

public void run() throws IOException
{
    sdlDS.show();
    StringIndexerModel labelIndexer = new StringIndexer()
            .setInputCol("label")
            .setOutputCol("indexedLabel")
            .fit(df);

    VectorIndexerModel featureIndexer = new VectorIndexer()
            .setInputCol("features")
            .setOutputCol("indexedFeatures")
            .setMaxCategories(4) // treat features with > 4 distinct values as continuous.
            .fit(df);

    DecisionTreeClassifier classifier = new DecisionTreeClassifier()
            .setLabelCol("indexedLabel")
            .setFeaturesCol("indexedFeatures");

    IndexToString labelConverter = new IndexToString()
            .setInputCol("prediction")
            .setOutputCol("predictedLabel")
            .setLabels(labelIndexer.labels());

    Pipeline pipeline = new Pipeline().setStages(new PipelineStage[]
    { labelIndexer, featureIndexer, classifier, labelConverter });

此代码显然需要一个包含 "label" 和 "features" 列的数据集,其中包含标签和双重编码特征向量。问题是 kyro 产生了一个名为 "values" 的列,它似乎包含一个字节数组。我不知道如何将其转换为原始 StringIndexer 和 VectorIndexer 所期望的文档。有人可以帮忙吗? Java请。

首先不要使用 Kryo 编码器。总的来说非常有限,在这里根本不适用。这里最简单的解决方案是删除自定义 class 并使用 Row 编码器。首先你需要一堆导入:

import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.ml.linalg.*;

和架构:

List<StructField> fields = new ArrayList<>();

fields.add(DataTypes.createStructField("label", DoubleType, false));
fields.add(DataTypes.createStructField("features", new VectorUDT(), false));
StructType schema = DataTypes.createStructType(fields);

编码器可以这样定义:

Encoder<Row> encoder = RowEncoder.apply(schema);

并使用如下所示:

Dataset<Row> inputDs = spark.read().json(sc.parallelize(Arrays.asList(
        "{\"lablel\": 1.0, \"features\": \"foo\"}"
)));

inputDs.map(new MapFunction<Row, Row>() {
    public Row call(Row row) {
        return RowFactory.create(1.0, Vectors.dense(1.0, 2.0));
    }
}, encoder);