如何在 Hadoop 中高效地将一个大数据集分割成多个小文件
How to divide a big dataset into multiple small files in Hadoop in an efficient way
我有一个大数据集,其中包含每个文件有 1M 条记录的文件,我想在 Hadoop 中将它分成一些文件,每个文件有 1000 条记录。我正在研究实现此目标的不同方案。一种是使拆分大小变小,以便每个映射器只获取少量记录(~1000 条记录),然后输出它们。这需要 运行 个效率不高的映射器。另一种解决方案是考虑一个减速器并将所有记录发送给它,然后它们在那里进行拆分。这对于 mapreduce 也是违反直觉的,因为所有工作仅由一个节点完成。将此数据集拆分成小文件的有效替代方法是什么?
如果你不是特别在意哪个记录到哪里去,那就事先算好你要的文件个数,放到配置里。然后你可以在映射器中有一个随机数生成器,它生成一个介于 0 和 (numFiles -1) 之间的随机数。将 num % numReducers
作为映射器输出的键,其中 numReducers 是您想要拥有的 reducer 的数量。
对于值,使用 MapWritable<IntWritable,RecordClass>
,将 RecordClass
替换为便于存储记录本身的任何值。对于 IntWritable
放置原始随机数,表示它应该进入哪个文件。将剩余的记录放入 RecordClass
插槽。
在 reducer 中,从 map 中提取随机数,并根据该数字将记录写入文件(如果数字为 1,则写入文件 FileName1,如果数字为 2,则写入文件 FileName2 等)。
您可以使用 NLineInputFormat 指定应将多少条记录作为映射器的输入。
将 属性 'mapreduce.input.lineinputformat.linespermap' 设置为 1000 的倍数,以便合理数量的映射器是 spawned.In 映射器,使用多个输出将每 1000 条记录写入使用计数器递增逻辑的单独文件.
使用多个输出将数据拆分为 1000 条记录的示例代码(对于文本文件)
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class DataSplitter {
public static class Map extends Mapper<LongWritable, Text, NullWritable, Text> {
private Text outputValue = new Text();
@SuppressWarnings("rawtypes")
private MultipleOutputs multipleOutputs;
private int fileCounter = 1;
private List<String> recordList = new ArrayList<String>();
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void setup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
multipleOutputs = new MultipleOutputs(context);
}
@SuppressWarnings("unchecked")
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
recordList.add(line);
if (recordList.size() == 1000) {
for (int i = 0; i < recordList.size(); i++) {
outputValue.set(recordList.get(i));
multipleOutputs.write("mos", NullWritable.get(), outputValue, "output-" + fileCounter);
}
fileCounter++;
recordList.clear();
}
}
@Override
protected void cleanup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
multipleOutputs.close();
if (!recordList.isEmpty()) {
for (int i = 0; i < recordList.size(); i++) {
outputValue.set(recordList.get(i));
context.write(NullWritable.get(), outputValue);
}
recordList.clear();
}
}
}
public static class Reduce extends Reducer<LongWritable, Text, NullWritable, Text> {
private Text outputValue = new Text();
@SuppressWarnings("rawtypes")
private MultipleOutputs multipleOutputs;
private int fileCounter = 1;
private List<String> recordList = new ArrayList<String>();
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected void setup(Reducer<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
multipleOutputs = new MultipleOutputs(context);
}
@SuppressWarnings("unchecked")
public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
String line = value.toString();
recordList.add(line);
if (recordList.size() == 1000) {
for (int i = 0; i < recordList.size(); i++) {
outputValue.set(recordList.get(i));
multipleOutputs.write("mos", NullWritable.get(), outputValue, "output-" + fileCounter);
}
fileCounter++;
recordList.clear();
}
if (!recordList.isEmpty()) {
for (int i = 0; i < recordList.size(); i++) {
outputValue.set(recordList.get(i));
context.write(NullWritable.get(), outputValue);
}
}
}
}
@Override
protected void cleanup(Reducer<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.cleanup(context);
multipleOutputs.close();
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
@SuppressWarnings("deprecation")
Job job = new Job(conf, "DataSplitter");
job.setJarByClass(DataSplitter.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileSystem.get(conf).delete(new Path(args[1]), true);
MultipleOutputs.addNamedOutput(job, "mos", TextOutputFormat.class, NullWritable.class, Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) == true ? 0 : 1);
}
}
使用 spark 将大文件拆分为多个小文件。
以下示例将输入文件拆分为 2 个文件:
scala> sc.textFile("/xyz-path/input-file",2).saveAsTextFile("/xyz-path/output-file")
textFile 中的第二个参数是 minPartitions,它使用默认分区程序。您还可以使用客户分区程序以获得更好的分区策略。详细了解自定义分区 here。
我有一个大数据集,其中包含每个文件有 1M 条记录的文件,我想在 Hadoop 中将它分成一些文件,每个文件有 1000 条记录。我正在研究实现此目标的不同方案。一种是使拆分大小变小,以便每个映射器只获取少量记录(~1000 条记录),然后输出它们。这需要 运行 个效率不高的映射器。另一种解决方案是考虑一个减速器并将所有记录发送给它,然后它们在那里进行拆分。这对于 mapreduce 也是违反直觉的,因为所有工作仅由一个节点完成。将此数据集拆分成小文件的有效替代方法是什么?
如果你不是特别在意哪个记录到哪里去,那就事先算好你要的文件个数,放到配置里。然后你可以在映射器中有一个随机数生成器,它生成一个介于 0 和 (numFiles -1) 之间的随机数。将 num % numReducers
作为映射器输出的键,其中 numReducers 是您想要拥有的 reducer 的数量。
对于值,使用 MapWritable<IntWritable,RecordClass>
,将 RecordClass
替换为便于存储记录本身的任何值。对于 IntWritable
放置原始随机数,表示它应该进入哪个文件。将剩余的记录放入 RecordClass
插槽。
在 reducer 中,从 map 中提取随机数,并根据该数字将记录写入文件(如果数字为 1,则写入文件 FileName1,如果数字为 2,则写入文件 FileName2 等)。
您可以使用 NLineInputFormat 指定应将多少条记录作为映射器的输入。
将 属性 'mapreduce.input.lineinputformat.linespermap' 设置为 1000 的倍数,以便合理数量的映射器是 spawned.In 映射器,使用多个输出将每 1000 条记录写入使用计数器递增逻辑的单独文件.
使用多个输出将数据拆分为 1000 条记录的示例代码(对于文本文件)
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class DataSplitter {
public static class Map extends Mapper<LongWritable, Text, NullWritable, Text> {
private Text outputValue = new Text();
@SuppressWarnings("rawtypes")
private MultipleOutputs multipleOutputs;
private int fileCounter = 1;
private List<String> recordList = new ArrayList<String>();
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void setup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
multipleOutputs = new MultipleOutputs(context);
}
@SuppressWarnings("unchecked")
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
recordList.add(line);
if (recordList.size() == 1000) {
for (int i = 0; i < recordList.size(); i++) {
outputValue.set(recordList.get(i));
multipleOutputs.write("mos", NullWritable.get(), outputValue, "output-" + fileCounter);
}
fileCounter++;
recordList.clear();
}
}
@Override
protected void cleanup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
multipleOutputs.close();
if (!recordList.isEmpty()) {
for (int i = 0; i < recordList.size(); i++) {
outputValue.set(recordList.get(i));
context.write(NullWritable.get(), outputValue);
}
recordList.clear();
}
}
}
public static class Reduce extends Reducer<LongWritable, Text, NullWritable, Text> {
private Text outputValue = new Text();
@SuppressWarnings("rawtypes")
private MultipleOutputs multipleOutputs;
private int fileCounter = 1;
private List<String> recordList = new ArrayList<String>();
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected void setup(Reducer<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
multipleOutputs = new MultipleOutputs(context);
}
@SuppressWarnings("unchecked")
public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
String line = value.toString();
recordList.add(line);
if (recordList.size() == 1000) {
for (int i = 0; i < recordList.size(); i++) {
outputValue.set(recordList.get(i));
multipleOutputs.write("mos", NullWritable.get(), outputValue, "output-" + fileCounter);
}
fileCounter++;
recordList.clear();
}
if (!recordList.isEmpty()) {
for (int i = 0; i < recordList.size(); i++) {
outputValue.set(recordList.get(i));
context.write(NullWritable.get(), outputValue);
}
}
}
}
@Override
protected void cleanup(Reducer<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.cleanup(context);
multipleOutputs.close();
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
@SuppressWarnings("deprecation")
Job job = new Job(conf, "DataSplitter");
job.setJarByClass(DataSplitter.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileSystem.get(conf).delete(new Path(args[1]), true);
MultipleOutputs.addNamedOutput(job, "mos", TextOutputFormat.class, NullWritable.class, Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) == true ? 0 : 1);
}
}
使用 spark 将大文件拆分为多个小文件。
以下示例将输入文件拆分为 2 个文件:
scala> sc.textFile("/xyz-path/input-file",2).saveAsTextFile("/xyz-path/output-file")
textFile 中的第二个参数是 minPartitions,它使用默认分区程序。您还可以使用客户分区程序以获得更好的分区策略。详细了解自定义分区 here。