如何根据if条件跳过spark rdd map操作中的行
How to skip line in spark rdd map action based on if condition
我有一个文件,我想把它交给 mllib 算法。所以我按照这个例子做了类似的事情:
val data = sc.textFile(my_file).
map {line =>
val parts = line.split(",");
Vectors.dense(parts.slice(1, parts.length).map(x => x.toDouble).toArray)
};
这很有效,只是有时我缺少一个功能。有时某行的一列没有任何数据,我想像这样扔掉这些行。
所以我想做这样的事情map{line => if(containsMissing(line) == true){ skipLine} else{ ... //same as before}}
如何执行此 skipLine 操作?
您可以使用filter
函数来过滤掉这样的行:
val data = sc.textFile(my_file)
.filter(_.split(",").length == cols)
.map {line =>
// your code
};
假设变量 cols
保存有效行中的列数。
您可以为此使用 flatMap、Some 和 None:
def missingFeatures(stuff): Boolean = ??? // Determine if features is missing
val data = sc.textFile(my_file)
.flatMap {line =>
val parts = line.split(",");
if(missingFeatures(parts)) None
else Some(Vectors.dense(parts.slice(1, parts.length).map(x => x.toDouble).toArray))
};
这样可以避免多次映射 rdd。
Java 跳过空行的代码/header 来自 Spark RDD:
首先是进口:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
现在,过滤器将总列与 17 或以 VendorID 开头的 header 列进行比较。
Function<String, Boolean> isInvalid = row -> (row.split(",").length == 17 && !(row.startsWith("VendorID")));
JavaRDD<String> taxis = sc.textFile("datasets/trip_yellow_taxi.data")
.filter(isInvalid);
我有一个文件,我想把它交给 mllib 算法。所以我按照这个例子做了类似的事情:
val data = sc.textFile(my_file).
map {line =>
val parts = line.split(",");
Vectors.dense(parts.slice(1, parts.length).map(x => x.toDouble).toArray)
};
这很有效,只是有时我缺少一个功能。有时某行的一列没有任何数据,我想像这样扔掉这些行。
所以我想做这样的事情map{line => if(containsMissing(line) == true){ skipLine} else{ ... //same as before}}
如何执行此 skipLine 操作?
您可以使用filter
函数来过滤掉这样的行:
val data = sc.textFile(my_file)
.filter(_.split(",").length == cols)
.map {line =>
// your code
};
假设变量 cols
保存有效行中的列数。
您可以为此使用 flatMap、Some 和 None:
def missingFeatures(stuff): Boolean = ??? // Determine if features is missing
val data = sc.textFile(my_file)
.flatMap {line =>
val parts = line.split(",");
if(missingFeatures(parts)) None
else Some(Vectors.dense(parts.slice(1, parts.length).map(x => x.toDouble).toArray))
};
这样可以避免多次映射 rdd。
Java 跳过空行的代码/header 来自 Spark RDD:
首先是进口:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
现在,过滤器将总列与 17 或以 VendorID 开头的 header 列进行比较。
Function<String, Boolean> isInvalid = row -> (row.split(",").length == 17 && !(row.startsWith("VendorID")));
JavaRDD<String> taxis = sc.textFile("datasets/trip_yellow_taxi.data")
.filter(isInvalid);