将字符串转换为 JavaRDD<String>
Convert String to JavaRDD<String>
我想对目录中的每个文本文件进行一些计算,然后使用结果计算另一个值。
从我使用的目录读取文件:
JavaPairRDD<String, String> textFiles = sc.wholeTextFiles(PATH);
接下来,针对每个文件
textFiles.foreach(file -> processFile(file));
我想做一些魔术,比如计算常用词。
我可以访问文件的路径及其内容。
JavaRDD提供了我需要的flatMap、mapToPair、reduceByKey等方法。
问题是,有没有办法把JavaPairRDD的值转换成JavaRDD?
The question is, is there any way to convert the value of the JavaPairRDD to JavaRDD?
textFiles.keys(); //Return an RDD with the keys of each tuple.
textFiles.values(); // Return an RDD with the values of each tuple.
*** 更新:
根据您更新的问题,我认为以下内容可以满足您的需求。我在目录 "tmp" 中创建了两个 CSV 文件。
one.csv:
one,1
two,2
three,3
two.csv:
four,4
five,5
six,6
然后运行本地代码:
String appName = UUID.randomUUID().toString();
SparkConf sc = new SparkConf().setAppName(appName).setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaPairRDD<String, String> fS = jsc.wholeTextFiles("tmp");
System.out.println("File names:");
fS.keys().collect().forEach(new Consumer<String>(){
public void accept(String t)
{
System.out.println(t);
}});
System.out.println("File content:");
fS.values().collect().forEach(new Consumer<String>(){
public void accept(String t)
{
System.out.println(t);
}});
jsc.close();
它产生以下输出(我删除了所有不必要的 Spark 输出并编辑了我的目录路径)
File names:
file:/......[my dir here]/one.csv
file:/......[my dir here]/two.csv
File content:
one,1
two,2
three,3
four,4
five,5
six,6
看来这就是您要的...
我想对目录中的每个文本文件进行一些计算,然后使用结果计算另一个值。
从我使用的目录读取文件:
JavaPairRDD<String, String> textFiles = sc.wholeTextFiles(PATH);
接下来,针对每个文件
textFiles.foreach(file -> processFile(file));
我想做一些魔术,比如计算常用词。 我可以访问文件的路径及其内容。
JavaRDD提供了我需要的flatMap、mapToPair、reduceByKey等方法。 问题是,有没有办法把JavaPairRDD的值转换成JavaRDD?
The question is, is there any way to convert the value of the JavaPairRDD to JavaRDD?
textFiles.keys(); //Return an RDD with the keys of each tuple.
textFiles.values(); // Return an RDD with the values of each tuple.
*** 更新:
根据您更新的问题,我认为以下内容可以满足您的需求。我在目录 "tmp" 中创建了两个 CSV 文件。
one.csv:
one,1
two,2
three,3
two.csv:
four,4
five,5
six,6
然后运行本地代码:
String appName = UUID.randomUUID().toString();
SparkConf sc = new SparkConf().setAppName(appName).setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaPairRDD<String, String> fS = jsc.wholeTextFiles("tmp");
System.out.println("File names:");
fS.keys().collect().forEach(new Consumer<String>(){
public void accept(String t)
{
System.out.println(t);
}});
System.out.println("File content:");
fS.values().collect().forEach(new Consumer<String>(){
public void accept(String t)
{
System.out.println(t);
}});
jsc.close();
它产生以下输出(我删除了所有不必要的 Spark 输出并编辑了我的目录路径)
File names:
file:/......[my dir here]/one.csv
file:/......[my dir here]/two.csv
File content:
one,1
two,2
three,3
four,4
five,5
six,6
看来这就是您要的...