如何在 Hadoop 中高效地将一个大数据集分割成多个小文件

How to divide a big dataset into multiple small files in Hadoop in an efficient way

我有一个大数据集,其中包含每个文件有 1M 条记录的文件,我想在 Hadoop 中将它分成一些文件,每个文件有 1000 条记录。我正在研究实现此目标的不同方案。一种是使拆分大小变小,以便每个映射器只获取少量记录(~1000 条记录),然后输出它们。这需要 运行 个效率不高的映射器。另一种解决方案是考虑一个减速器并将所有记录发送给它,然后它们在那里进行拆分。这对于 mapreduce 也是违反直觉的,因为所有工作仅由一个节点完成。将此数据集拆分成小文件的有效替代方法是什么?

如果你不是特别在意哪个记录到哪里去,那就事先算好你要的文件个数,放到配置里。然后你可以在映射器中有一个随机数生成器,它生成一个介于 0 和 (numFiles -1) 之间的随机数。将 num % numReducers 作为映射器输出的键,其中 numReducers 是您想要拥有的 reducer 的数量。

对于值,使用 MapWritable<IntWritable,RecordClass>,将 RecordClass 替换为便于存储记录本身的任何值。对于 IntWritable 放置原始随机数,表示它应该进入哪个文件。将剩余的记录放入 RecordClass 插槽。

在 reducer 中,从 map 中提取随机数,并根据该数字将记录写入文件(如果数字为 1,则写入文件 FileName1,如果数字为 2,则写入文件 FileName2 等)。

您可以使用 NLineInputFormat 指定应将多少条记录作为映射器的输入。

将 属性 'mapreduce.input.lineinputformat.linespermap' 设置为 1000 的倍数,以便合理数量的映射器是 spawned.In 映射器,使用多个输出将每 1000 条记录写入使用计数器递增逻辑的单独文件.

使用多个输出将数据拆分为 1000 条记录的示例代码(对于文本文件)

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class DataSplitter {

    public static class Map extends Mapper<LongWritable, Text, NullWritable, Text> {

        private Text outputValue = new Text();

        @SuppressWarnings("rawtypes")
        private MultipleOutputs multipleOutputs;

        private int fileCounter = 1;

        private List<String> recordList = new ArrayList<String>();

        @SuppressWarnings({ "rawtypes", "unchecked" })
        @Override
        protected void setup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {

            multipleOutputs = new MultipleOutputs(context);

        }

        @SuppressWarnings("unchecked")
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();

            recordList.add(line);

            if (recordList.size() == 1000) {

                for (int i = 0; i < recordList.size(); i++) {

                    outputValue.set(recordList.get(i));

                    multipleOutputs.write("mos", NullWritable.get(), outputValue, "output-" + fileCounter);

                }

                fileCounter++;

                recordList.clear();
            }

        }

        @Override
        protected void cleanup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {

            multipleOutputs.close();

            if (!recordList.isEmpty()) {

                for (int i = 0; i < recordList.size(); i++) {

                    outputValue.set(recordList.get(i));

                    context.write(NullWritable.get(), outputValue);

                }
                recordList.clear();

            }
        }

    }

    public static class Reduce extends Reducer<LongWritable, Text, NullWritable, Text> {

        private Text outputValue = new Text();

        @SuppressWarnings("rawtypes")
        private MultipleOutputs multipleOutputs;

        private int fileCounter = 1;

        private List<String> recordList = new ArrayList<String>();

        @SuppressWarnings({ "unchecked", "rawtypes" })
        @Override
        protected void setup(Reducer<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            multipleOutputs = new MultipleOutputs(context);
        }

        @SuppressWarnings("unchecked")
        public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            for (Text value : values) {

                String line = value.toString();

                recordList.add(line);

                if (recordList.size() == 1000) {

                    for (int i = 0; i < recordList.size(); i++) {

                        outputValue.set(recordList.get(i));

                        multipleOutputs.write("mos", NullWritable.get(), outputValue, "output-" + fileCounter);

                    }
                    fileCounter++;
                    recordList.clear();
                }

                if (!recordList.isEmpty()) {

                    for (int i = 0; i < recordList.size(); i++) {

                        outputValue.set(recordList.get(i));

                        context.write(NullWritable.get(), outputValue);

                    }
                }
            }

        }

        @Override
        protected void cleanup(Reducer<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            super.cleanup(context);
            multipleOutputs.close();
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "DataSplitter");
        job.setJarByClass(DataSplitter.class);

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileSystem.get(conf).delete(new Path(args[1]), true);

        MultipleOutputs.addNamedOutput(job, "mos", TextOutputFormat.class, NullWritable.class, Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) == true ? 0 : 1);
    }

}

使用 spark 将大文件拆分为多个小文件。

以下示例将输入文件拆分为 2 个文件:

     scala> sc.textFile("/xyz-path/input-file",2).saveAsTextFile("/xyz-path/output-file")

textFile 中的第二个参数是 minPartitions,它使用默认分区程序。您还可以使用客户分区程序以获得更好的分区策略。详细了解自定义分区 here