如何 traverse/iterate Spark Java 中的数据集?
How to traverse/iterate a Dataset in Spark Java?
我正在尝试遍历数据集以进行一些字符串相似度计算,例如 Jaro winkler 或余弦相似度。我将我的数据集转换为行列表,然后使用 for 语句遍历,这不是高效的 spark 方法。所以我期待在 Spark 中有更好的方法。
public class sample {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Example").setMaster("local[*]"));
SQLContext sqlContext = new SQLContext(sc);
SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();
List<Row> data = Arrays.asList(RowFactory.create("Mysore","Mysuru"),
RowFactory.create("Name","FirstName"));
StructType schema = new StructType(
new StructField[] { new StructField("Word1", DataTypes.StringType, true, Metadata.empty()),
new StructField("Word2", DataTypes.StringType, true, Metadata.empty()) });
Dataset<Row> oldDF = spark.createDataFrame(data, schema);
oldDF.show();
List<Row> rowslist = oldDF.collectAsList();
}
}
我找到了很多我不清楚的JavaRDD例子。数据集示例将对我有很大帮助。
您可以像下面那样使用 org.apache.spark.api.java.function.ForeachFunction
。
oldDF.foreach((ForeachFunction<Row>) row -> System.out.println(row));
对于不支持lambda表达式的旧java jdks,导入后可以使用如下:
import org.apache.spark.api.java.function.VoidFunction;
yourDataSet.toJavaRDD().foreach(new VoidFunction<Row>() {
public void call(Row r) throws Exception {
System.out.println(r.getAs("your column name here"));
}
});
我正在尝试遍历数据集以进行一些字符串相似度计算,例如 Jaro winkler 或余弦相似度。我将我的数据集转换为行列表,然后使用 for 语句遍历,这不是高效的 spark 方法。所以我期待在 Spark 中有更好的方法。
public class sample {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Example").setMaster("local[*]"));
SQLContext sqlContext = new SQLContext(sc);
SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();
List<Row> data = Arrays.asList(RowFactory.create("Mysore","Mysuru"),
RowFactory.create("Name","FirstName"));
StructType schema = new StructType(
new StructField[] { new StructField("Word1", DataTypes.StringType, true, Metadata.empty()),
new StructField("Word2", DataTypes.StringType, true, Metadata.empty()) });
Dataset<Row> oldDF = spark.createDataFrame(data, schema);
oldDF.show();
List<Row> rowslist = oldDF.collectAsList();
}
}
我找到了很多我不清楚的JavaRDD例子。数据集示例将对我有很大帮助。
您可以像下面那样使用 org.apache.spark.api.java.function.ForeachFunction
。
oldDF.foreach((ForeachFunction<Row>) row -> System.out.println(row));
对于不支持lambda表达式的旧java jdks,导入后可以使用如下:
import org.apache.spark.api.java.function.VoidFunction;
yourDataSet.toJavaRDD().foreach(new VoidFunction<Row>() {
public void call(Row r) throws Exception {
System.out.println(r.getAs("your column name here"));
}
});