TotalOrderPartion 与 ChainMapper
TotalOrderPartion with ChainMapper
我有一个 ChainMapper,它关联了 2 个映射器。我正在尝试对链中的最后一个映射器执行 TotalOrderPartition,但没有取得很大成功。
有没有办法根据链中第 N 个映射器上的一些采样来强制执行分区?
public class WordCountChain extends Configured implements Tool
{
@Override
public int run(String[] args) throws Exception
{
Job job = new Job(getConf(), "Word Count V1 (Chain)");
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
/*********** First Mapper ***********/
Configuration wcpMapperConf = new Configuration(false);
ChainMapper.addMapper(job, WordCountPreparationMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, wcpMapperConf);
/*********** Second Mapper ***********/
Configuration wcMapperConf = new Configuration(false);
ChainMapper.addMapper(job, Mapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, wcMapperConf);
/******* This enforces the Sampling/Partitioning over the First Mapper *******/
//job.setInputFormatClass(SequenceFileInputFormat.class);
//InputSampler.Sampler<Text, IntWritable> sampler = new InputSampler.RandomSampler<Text, IntWritable>(0.1, 10000, 10);
//InputSampler.writePartitionFile(job, sampler);
//job.addCacheFile( new URI( TotalOrderPartitioner.getPartitionFile(getConf()) ) );
job.setNumReduceTasks(10);
job.setReducerClass(WordCountReducer.class);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception
{
int exitCode = ToolRunner.run(new WordCountChain(), args);
System.exit(exitCode);
}
}
不幸的是,RandomSampler 运行s 在作业开始之前,实际上它 运行s 当你调用
InputSampler.writePartitionFile(job, sampler);
这意味着它不会 运行 任何 Mapper 的输出,而是作业的输入数据集。
如果您需要根据第 N 个 Mapper 的输出进行分区,您可以将您的作业拆分为两个作业,一个 map-only 作业和一个 mapreduce 作业。第一个是 运行 映射器链,直到第 N 个映射器,然后只存储它的输出。第二个工作将根据输入(这将是第 N 个 Mapper 的输出)进行采样和分区,然后 运行 其余的 Mappers 和你的 Reducer。
我有一个 ChainMapper,它关联了 2 个映射器。我正在尝试对链中的最后一个映射器执行 TotalOrderPartition,但没有取得很大成功。
有没有办法根据链中第 N 个映射器上的一些采样来强制执行分区?
public class WordCountChain extends Configured implements Tool
{
@Override
public int run(String[] args) throws Exception
{
Job job = new Job(getConf(), "Word Count V1 (Chain)");
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
/*********** First Mapper ***********/
Configuration wcpMapperConf = new Configuration(false);
ChainMapper.addMapper(job, WordCountPreparationMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, wcpMapperConf);
/*********** Second Mapper ***********/
Configuration wcMapperConf = new Configuration(false);
ChainMapper.addMapper(job, Mapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, wcMapperConf);
/******* This enforces the Sampling/Partitioning over the First Mapper *******/
//job.setInputFormatClass(SequenceFileInputFormat.class);
//InputSampler.Sampler<Text, IntWritable> sampler = new InputSampler.RandomSampler<Text, IntWritable>(0.1, 10000, 10);
//InputSampler.writePartitionFile(job, sampler);
//job.addCacheFile( new URI( TotalOrderPartitioner.getPartitionFile(getConf()) ) );
job.setNumReduceTasks(10);
job.setReducerClass(WordCountReducer.class);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception
{
int exitCode = ToolRunner.run(new WordCountChain(), args);
System.exit(exitCode);
}
}
不幸的是,RandomSampler 运行s 在作业开始之前,实际上它 运行s 当你调用
InputSampler.writePartitionFile(job, sampler);
这意味着它不会 运行 任何 Mapper 的输出,而是作业的输入数据集。
如果您需要根据第 N 个 Mapper 的输出进行分区,您可以将您的作业拆分为两个作业,一个 map-only 作业和一个 mapreduce 作业。第一个是 运行 映射器链,直到第 N 个映射器,然后只存储它的输出。第二个工作将根据输入(这将是第 N 个 Mapper 的输出)进行采样和分区,然后 运行 其余的 Mappers 和你的 Reducer。