如何根据if条件跳过spark rdd map操作中的行

How to skip line in spark rdd map action based on if condition

我有一个文件,我想把它交给 mllib 算法。所以我按照这个例子做了类似的事情:

val data = sc.textFile(my_file).
    map {line =>

        val parts = line.split(",");
        Vectors.dense(parts.slice(1, parts.length).map(x => x.toDouble).toArray)
};

这很有效,只是有时我缺少一个功能。有时某行的一列没有任何数据,我想像这样扔掉这些行。

所以我想做这样的事情map{line => if(containsMissing(line) == true){ skipLine} else{ ... //same as before}}

如何执行此 skipLine 操作?

您可以使用filter函数来过滤掉这样的行:

val data = sc.textFile(my_file)
   .filter(_.split(",").length == cols)
   .map {line =>
        // your code
   };

假设变量 cols 保存有效行中的列数。

您可以为此使用 flatMap、Some 和 None:

def missingFeatures(stuff): Boolean = ??? // Determine if features is missing

val data = sc.textFile(my_file)
  .flatMap {line =>
    val parts = line.split(",");
    if(missingFeatures(parts)) None
    else Some(Vectors.dense(parts.slice(1, parts.length).map(x => x.toDouble).toArray))
};

这样可以避免多次映射 rdd。

Java 跳过空行的代码/header 来自 Spark RDD:

首先是进口:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;

现在,过滤器将总列与 17 或以 VendorID 开头的 header 列进行比较。

Function<String, Boolean> isInvalid = row -> (row.split(",").length == 17 && !(row.startsWith("VendorID")));
JavaRDD<String> taxis = sc.textFile("datasets/trip_yellow_taxi.data")
                        .filter(isInvalid);