如何在spark RDD(JavaRDD)中获取记录的文件名

How to Get the file name for record in spark RDD (JavaRDD)

我正在使用

将多个文件加载到 JavaRDD 中
JavaRDD<String> allLines = sc.textFile(hdfs://path/*.csv);

加载文件后我修改了每条记录并想保存它们。但是,我还需要将原始文件名 (ID) 与记录一起保存,以备将来参考。无论如何,我可以从 RDD 中的单个记录中获取原始文件名吗? 谢谢

您需要 spark 的 wholeTextFiles 函数。来自文档:

For example, if you have the following files:

   hdfs://a-hdfs-path/part-00000
   hdfs://a-hdfs-path/part-00001
   ...
   hdfs://a-hdfs-path/part-nnnnn

Do val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path"),

then rdd contains

   (a-hdfs-path/part-00000, its content)
   (a-hdfs-path/part-00001, its content)
   ...
   (a-hdfs-path/part-nnnnn, its content)

它 returns 你是一个元组的 RDD,其中左边是文件名,右边是内容。

您应该可以使用 toDebugString. Using wholeTextFile will read in the entire content of your file as one element, whereas sc.textfile creates an RDD with each line as an individual element - as described here

例如:

val file= sc.textFile("/user/user01/whatever.txt").cache()

val wordcount = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

wordcount.toDebugString

// res0: String =

// (2) ShuffledRDD[4] at reduceByKey at <console>:23 []

// +-(2) MapPartitionsRDD[3] at map at <console>:23 []

//    |  MapPartitionsRDD[2] at flatMap at <console>:23 []

//    |  /user/user01/whatever.txt MapPartitionsRDD[1] at textFile at <console>:21 []

//    |  /user/user01/whatever.txt HadoopRDD[0] at textFile at <console>:21 []

您可以尝试执行以下代码段中的操作:

JavaPairRDD<LongWritable, Text> javaPairRDD = sc.newAPIHadoopFile(
    "hdfs://path/*.csv", 
    TextInputFormat.class, 
    LongWritable.class, 
    Text.class, 
    new Configuration()
);
JavaNewHadoopRDD<LongWritable, Text> hadoopRDD = (JavaNewHadoopRDD) javaPairRDD;

JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit((inputSplit, lines) -> {
    FileSplit fileSplit = (FileSplit) inputSplit;
    String fileName = fileSplit.getPath().getName();

    Stream<Tuple2<String, String>> stream =
        StreamSupport.stream(Spliterators.spliteratorUnknownSize(lines, Spliterator.ORDERED), false)
            .map(line -> {
                String lineText = line._2().toString();
                // emit file name as key and line as a value
                return new Tuple2(fileName, lineText);
            });
    return stream.iterator();
}, true);

更新(针对 java7)

JavaRDD<Tuple2<String, String>> namedLinesRDD = hadoopRDD.mapPartitionsWithInputSplit(
    new Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<Tuple2<String, String>>>() {
        @Override
        public Iterator<Tuple2<String, String>> call(InputSplit inputSplit, final Iterator<Tuple2<LongWritable, Text>> lines) throws Exception {
            FileSplit fileSplit = (FileSplit) inputSplit;
            final String fileName = fileSplit.getPath().getName();
            return new Iterator<Tuple2<String, String>>() {
                @Override
                public boolean hasNext() {
                    return lines.hasNext();
                }
                @Override
                public Tuple2<String, String> next() {
                    Tuple2<LongWritable, Text> entry = lines.next();
                    return new Tuple2<String, String>(fileName, entry._2().toString());
                }
            };
        }
    }, 
    true
);