使用 Apache Spark 在 HDFS 中存储多个文件
Multiple file storage in HDFS using Apache Spark
我正在做一个涉及使用 HDFS 进行存储和使用 Apache Spark 进行计算的项目。
我在 HDFS 中有一个目录,其中同时包含多个文本文件 depth.I 想使用 Spark 处理所有这些文件并将其相应的结果存储回 HDFS,每个输入文件有 1 个输出文件。
例如 - 假设我有一个目录,其中包含 1000 个文本文件,深度相同。
我正在使用通配符读取所有这些文件
sc.wholeTextFiles(hdfs://localhost:9000/home/akshat/files/*.txt)
然后我使用 Spark 处理它们并获得相应的 RDD 并使用
保存它
result.saveAsTextFile("hdfs://localhost:9000/home/akshat/final")
但它在一个文件中提供了所有输入文件的结果,我想获取每个文件,单独处理它们并单独存储每个文件的输出。
我的下一个方法应该是什么?
提前致谢!
您可以使用 wholeTextFiles() 来做到这一点,注意: 下面的方法一个一个地处理文件。
val data = sc.wholeTextFiles("hdfs://master:port/vijay/mywordcount/")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String) = {
println (file);
// your logic of processing a single file comes here
val logData = sc.textFile(file);
val numAs = logData.filter(line => line.contains("a")).count();
println("Lines with a: %s".format(numAs));
// save rdd of single file processed data to hdfs comes here
}
files.collect.foreach( filename => {
doSomething(filename)
})
其中:
- hdfs://master:port/vijay/mywordcount/ --- 你的 hdfs 目录
- 数据 - org.apache.spark.rdd.RDD[(字符串,字符串)]
- 文件 - org.apache.spark.rdd.RDD[字符串]- 文件名
- doSomething(文件名) - 你的逻辑
更新:多个输出文件
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
/* hadoop */
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
/* java */
import java.io.Serializable;
import org.apache.log4j.Logger
import org.apache.log4j.Level
/* Custom TextOutput Format */
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
return key.asInstanceOf[String] +"-"+ name; // for output hdfs://Ouptut_dir/inputFilename-part-****
//return key.asInstanceOf[String] +"/"+ name; // for output hdfs://Ouptut_dir/inputFilename/part-**** [inputFilename - as directory of its partFiles ]
}
/* Spark Context */
object Spark {
val sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
/* WordCount Processing */
object Process extends Serializable{
def apply(filename: String): org.apache.spark.rdd.RDD[(String, String)]= {
println("i am called.....")
val simple_path = filename.split('/').last;
val lines = Spark.sc.textFile(filename);
val counts = lines.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _); //(word,count)
val fname_word_counts = counts.map( x => (simple_path,x._1+"\t"+ x._2)); // (filename,word\tcount)
fname_word_counts
}
}
object SimpleApp {
def main(args: Array[String]) {
//Logger.getLogger("org").setLevel(Level.OFF)
//Logger.getLogger("akka").setLevel(Level.OFF)
// input ans output paths
val INPUT_PATH = "hdfs://master:8020/vijay/mywordcount/"
val OUTPUT_PATH = "hdfs://master:8020/vijay/mywordcount/output/"
// context
val context = Spark.sc
val data = context.wholeTextFiles(INPUT_PATH)
// final output RDD
var output : org.apache.spark.rdd.RDD[(String, String)] = context.emptyRDD
// files to process
val files = data.map { case (filename, content) => filename}
// Apply wordcount Processing on each File received in wholeTextFiles.
files.collect.foreach( filename => {
output = output.union(Process(filename));
})
//output.saveAsTextFile(OUTPUT_PATH); // this will save output as (filename,word\tcount)
output.saveAsHadoopFile(OUTPUT_PATH, classOf[String], classOf[String],classOf[RDDMultipleTextOutputFormat]) // custom output Format.
//close context
context.stop();
}
}
环境:
- Scala 编译器版本 2.10.2
- spark-1.2.0-bin-hadoop2.3
- Hadoop 2.3.0-cdh5.0.3
示例输出:
[ramisetty@node-1 stack]$ hadoop fs -ls /vijay/mywordcount/output
Found 5 items
-rw-r--r-- 3 ramisetty supergroup 0 2015-06-09 03:49 /vijay/mywordcount/output/_SUCCESS
-rw-r--r-- 3 ramisetty supergroup 40 2015-06-09 03:49 /vijay/mywordcount/output/file1.txt-part-00000
-rw-r--r-- 3 ramisetty supergroup 8 2015-06-09 03:49 /vijay/mywordcount/output/file1.txt-part-00001
-rw-r--r-- 3 ramisetty supergroup 44 2015-06-09 03:49 /vijay/mywordcount/output/file2.txt-part-00002
-rw-r--r-- 3 ramisetty supergroup 8 2015-06-09 03:49 /vijay/mywordcount/output/file2.txt-part-00003
我正在做一个涉及使用 HDFS 进行存储和使用 Apache Spark 进行计算的项目。 我在 HDFS 中有一个目录,其中同时包含多个文本文件 depth.I 想使用 Spark 处理所有这些文件并将其相应的结果存储回 HDFS,每个输入文件有 1 个输出文件。
例如 - 假设我有一个目录,其中包含 1000 个文本文件,深度相同。 我正在使用通配符读取所有这些文件
sc.wholeTextFiles(hdfs://localhost:9000/home/akshat/files/*.txt)
然后我使用 Spark 处理它们并获得相应的 RDD 并使用
保存它result.saveAsTextFile("hdfs://localhost:9000/home/akshat/final")
但它在一个文件中提供了所有输入文件的结果,我想获取每个文件,单独处理它们并单独存储每个文件的输出。
我的下一个方法应该是什么?
提前致谢!
您可以使用 wholeTextFiles() 来做到这一点,注意: 下面的方法一个一个地处理文件。
val data = sc.wholeTextFiles("hdfs://master:port/vijay/mywordcount/")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String) = {
println (file);
// your logic of processing a single file comes here
val logData = sc.textFile(file);
val numAs = logData.filter(line => line.contains("a")).count();
println("Lines with a: %s".format(numAs));
// save rdd of single file processed data to hdfs comes here
}
files.collect.foreach( filename => {
doSomething(filename)
})
其中:
- hdfs://master:port/vijay/mywordcount/ --- 你的 hdfs 目录
- 数据 - org.apache.spark.rdd.RDD[(字符串,字符串)]
- 文件 - org.apache.spark.rdd.RDD[字符串]- 文件名
- doSomething(文件名) - 你的逻辑
更新:多个输出文件
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
/* hadoop */
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
/* java */
import java.io.Serializable;
import org.apache.log4j.Logger
import org.apache.log4j.Level
/* Custom TextOutput Format */
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateActualKey(key: Any, value: Any): Any =
NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
return key.asInstanceOf[String] +"-"+ name; // for output hdfs://Ouptut_dir/inputFilename-part-****
//return key.asInstanceOf[String] +"/"+ name; // for output hdfs://Ouptut_dir/inputFilename/part-**** [inputFilename - as directory of its partFiles ]
}
/* Spark Context */
object Spark {
val sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
/* WordCount Processing */
object Process extends Serializable{
def apply(filename: String): org.apache.spark.rdd.RDD[(String, String)]= {
println("i am called.....")
val simple_path = filename.split('/').last;
val lines = Spark.sc.textFile(filename);
val counts = lines.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _); //(word,count)
val fname_word_counts = counts.map( x => (simple_path,x._1+"\t"+ x._2)); // (filename,word\tcount)
fname_word_counts
}
}
object SimpleApp {
def main(args: Array[String]) {
//Logger.getLogger("org").setLevel(Level.OFF)
//Logger.getLogger("akka").setLevel(Level.OFF)
// input ans output paths
val INPUT_PATH = "hdfs://master:8020/vijay/mywordcount/"
val OUTPUT_PATH = "hdfs://master:8020/vijay/mywordcount/output/"
// context
val context = Spark.sc
val data = context.wholeTextFiles(INPUT_PATH)
// final output RDD
var output : org.apache.spark.rdd.RDD[(String, String)] = context.emptyRDD
// files to process
val files = data.map { case (filename, content) => filename}
// Apply wordcount Processing on each File received in wholeTextFiles.
files.collect.foreach( filename => {
output = output.union(Process(filename));
})
//output.saveAsTextFile(OUTPUT_PATH); // this will save output as (filename,word\tcount)
output.saveAsHadoopFile(OUTPUT_PATH, classOf[String], classOf[String],classOf[RDDMultipleTextOutputFormat]) // custom output Format.
//close context
context.stop();
}
}
环境:
- Scala 编译器版本 2.10.2
- spark-1.2.0-bin-hadoop2.3
- Hadoop 2.3.0-cdh5.0.3
示例输出:
[ramisetty@node-1 stack]$ hadoop fs -ls /vijay/mywordcount/output
Found 5 items
-rw-r--r-- 3 ramisetty supergroup 0 2015-06-09 03:49 /vijay/mywordcount/output/_SUCCESS
-rw-r--r-- 3 ramisetty supergroup 40 2015-06-09 03:49 /vijay/mywordcount/output/file1.txt-part-00000
-rw-r--r-- 3 ramisetty supergroup 8 2015-06-09 03:49 /vijay/mywordcount/output/file1.txt-part-00001
-rw-r--r-- 3 ramisetty supergroup 44 2015-06-09 03:49 /vijay/mywordcount/output/file2.txt-part-00002
-rw-r--r-- 3 ramisetty supergroup 8 2015-06-09 03:49 /vijay/mywordcount/output/file2.txt-part-00003