java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector 无法转换为 org.apache.spark.api.java.JavaRDD

java.lang.ClassCastException: org.apache.spark.mllib.linalg.DenseVector cannot be cast to org.apache.spark.api.java.JavaRDD

    SparkConf sparkConf = new SparkConf().setAppName("SummaryStatistics");  
    JavaSparkContext spark = new JavaSparkContext(sparkConf);

    JavaRDD<String> textFile = spark.textFile(args[0]); 

    JavaRDD<Vector> points = textFile.map(new ParsePoint());

    RowMatrix mat = new RowMatrix(points.rdd());
    MultivariateStatisticalSummary summary = mat.computeColumnSummaryStatistics();

    System.out.println(summary.mean()); 

    JavaRDD<Vector> result=(JavaRDD<Vector>) summary.mean(); // ***** Throwing error****
    result.saveAsTextFile(args[1]);

我们如何将结果 summary.mean() 存储在文件中。上述方法(在向量 RDD 中转换 summary.mean() )不起作用并给出该异常。

您不能只将任何类型 X 的对象 转换为 JavaRDD<X>。您需要使用 SparkContext 的 parallelize 方法创建一个 RDD。所以——如果你真的想用 Spark 来保存一个单向量,你可以通过创建一个基于单记录集合的 RDD 来实现:

List<Vector> oneItemList = new LinkedList<>();
oneItemList.add(summary.mean());
JavaRDD<Vector> result = spark.parallelize(oneItemList);
result.saveAsTextFile(args[1]);

但这有点矫枉过正(使用 Spark 保存一条记录)。

或者,您可以使用 HDFS API 保存 HDFS 文件,例如:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

Path path = new Path(args[1]);
Configuration conf = new Configuration(); // set your HDFS properties if needed
FileSystem fileSystem = FileSystem.get(conf); 
// (assuming Java 7 or higher)
try (FSDataOutputStream out = fileSystem.create(path)) {
    out.writeBytes(summary.mean().toString());
    out.flush();
}

注意:示例使用 Java 7、Hadoop V2.4、Spark V1.5.2 - 但 APIs 是稳定的,因此对于其他最新版本应该不会有太大变化。