如何在 MapReduce 作业开始使用 JobControl 之前执行操作
How to execute actions before MapReduce job starts using JobControl
我有控制 n 个作业链的 JobControl。
for (int i = 0; i < iterations; i++) {
Job eStep = EStepJob.createJob(config);
Job mStep = MStepJob.createJob(config);
emChain.add(new ControlledJob(eStep, getDeps(emChain)));
emChain.add(new ControlledJob(mStep, getDeps(emChain)));
}
jobControl.addJobCollection(emChain);
我只想清理输出目录,并且只在每个作业开始之前;
但在作业初始化时不得清除目录。
我目前的解决方案是将清除代码放入映射阶段,这会大大减慢执行速度。
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
if (fs.exists(new Path(context.getConfiguration().get(
AR_PROBS_OUTPUT)))) {
fs.delete(
new Path(context.getConfiguration()
.get(AR_PROBS_OUTPUT)), true);
}
有没有更合适的方法?
您可以在初始化作业时将输出存储到临时目录中。
作业完成后,您可以删除临时目录。
然后你可以检查一下,输出是否需要提交?如果是,那么您可以使用 OutputCommitter 提交输出。
请检查下面link:
https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/OutputCommitter.html
您可以使用 Mapper.setup() 方法。它是在任何节点上启动任何地图任务之前执行的方法。
我相信您在代码中初始化文件系统时正在使用 HDFS。
无论如何,代码应该以同样的方式工作。但是它被执行的次数将等于生成的Mapper任务的数量,而不是每个Mapper任务被执行的次数!
我有控制 n 个作业链的 JobControl。
for (int i = 0; i < iterations; i++) {
Job eStep = EStepJob.createJob(config);
Job mStep = MStepJob.createJob(config);
emChain.add(new ControlledJob(eStep, getDeps(emChain)));
emChain.add(new ControlledJob(mStep, getDeps(emChain)));
}
jobControl.addJobCollection(emChain);
我只想清理输出目录,并且只在每个作业开始之前; 但在作业初始化时不得清除目录。 我目前的解决方案是将清除代码放入映射阶段,这会大大减慢执行速度。
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
if (fs.exists(new Path(context.getConfiguration().get(
AR_PROBS_OUTPUT)))) {
fs.delete(
new Path(context.getConfiguration()
.get(AR_PROBS_OUTPUT)), true);
}
有没有更合适的方法?
您可以在初始化作业时将输出存储到临时目录中。 作业完成后,您可以删除临时目录。
然后你可以检查一下,输出是否需要提交?如果是,那么您可以使用 OutputCommitter 提交输出。
请检查下面link:
https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapred/OutputCommitter.html
您可以使用 Mapper.setup() 方法。它是在任何节点上启动任何地图任务之前执行的方法。 我相信您在代码中初始化文件系统时正在使用 HDFS。
无论如何,代码应该以同样的方式工作。但是它被执行的次数将等于生成的Mapper任务的数量,而不是每个Mapper任务被执行的次数!