当 DataFrame 有列时如何使用 Java Apache Spark MLlib?

How to work with Java Apache Spark MLlib when DataFrame has columns?

所以我是 Apache Spark 的新手,我有一个如下所示的文件:

Name     Size    Records 
File1    1,000   104,370 
File2    950     91,780 
File3    1,500   109,123 
File4    2,170   113,888
File5    2,000   111,974
File6    1,820   110,666
File7    1,200   106,771 
File8    1,500   108,991 
File9    1,000   104,007
File10   1,300   107,037
File11   1,900   111,109
File12   1,430   108,051
File13   1,780   110,006
File14   2,010   114,449
File15   2,017   114,889

这是我的 sample/test 数据。我正在开发一个异常检测程序,我必须测试具有相同格式但不同值的其他文件,并检测哪些文件的大小和记录值存在异常(如果另一个文件上的 size/records 与标准一个,或者如果大小和记录彼此不成比例)。我决定开始尝试不同的 ML 算法,我想从 k-Means 方法开始。我尝试将此文件放在以下行中:

KMeansModel model = kmeans.fit(file)

文件已解析为数据集变量。但是我收到一个错误,我很确定它与文件的 structure/schema 有关。尝试适应模型时,有没有办法处理 structured/labeled/organized 数据?

我收到以下错误:线程 "main" java.lang.IllegalArgumentException 中出现异常:字段 "features" 不存在。

这是代码:

public class practice {

public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("Anomaly Detection").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    SparkSession spark = SparkSession
              .builder()
              .appName("Anomaly Detection")
              .getOrCreate();

String day1 = "C:\Users\ZK0GJXO\Documents\day1.txt";

    Dataset<Row> df = spark.read().
            option("header", "true").
            option("delimiter", "\t").
            csv(day1);
    df.show();
    KMeans kmeans = new KMeans().setK(2).setSeed(1L);
    KMeansModel model = kmeans.fit(df);
}

}

谢谢

默认情况下,所有 Spark ML 模型都在名为 "features" 的列上进行训练。可以通过 setFeaturesCol 方法指定不同的输入列名称 http://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/clustering/KMeans.html#setFeaturesCol(java.lang.String)

更新:

可以使用 VectorAssembler 将多列组合成一个特征向量:

VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"size", "records"})
.setOutputCol("features");

 Dataset<Row> vectorized_df = assembler.transform(df)

 KMeans kmeans = new KMeans().setK(2).setSeed(1L);
 KMeansModel model = kmeans.fit(vectorized_df);

可以使用管道进一步简化和链接这些特征转换 API https://spark.apache.org/docs/latest/ml-pipeline.html#example-pipeline