Hadoop、MapReduce 自定义 Java 线程中的异常计数器 "main" java.lang.IllegalStateException:作业处于 DEFINE 状态而不是 运行
Hadoop, MapReduce Custom Java Counters Exception in thread "main" java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
错误是:
Exception in thread "main" java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:294)
at org.apache.hadoop.mapreduce.Job.getCounters(Job.java:762)
at com.aamend.hadoop.MapReduce.CountryIncomeConf.main(CountryIncomeConf.java:41)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
报错说明问题出在这一行:
Counter counter =
job.getCounters().findCounter(COUNTERS.MISSING_FIELDS_RECORD_COUNT);
我还有一个名为 COUNTERS 的枚举。
映射器:
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
public class CountryIncomeMapper extends Mapper<Object, Text, Text, DoubleWritable> {
private Logger logger = Logger.getLogger("FilterMapper");
private final int incomeIndex = 54;
private final int countryIndex = 0;
private final int lenIndex = 58;
String seperator = ",";
public void map(Object key, Text line, Context context) throws IOException,
InterruptedException {
if (line == null) {
logger.info("null found.");
context.getCounter(COUNTERS.ERROR_COUNT).increment(1);
return;
}
if (line.toString().contains(
"Adjusted net national income per capita (current US$)")) {
String[] recordSplits = line.toString().split(seperator);
logger.info("The data has been splitted.");
if (recordSplits.length == lenIndex) {
String countryName = recordSplits[countryIndex];
try {
double income = Double.parseDouble(recordSplits[incomeIndex]);
context.write(new Text(countryName), new DoubleWritable(income));
} catch (NumberFormatException nfe) {
logger.info("The value of income is in wrong format." + countryName);
context.getCounter(COUNTERS.MISSING_FIELDS_RECORD_COUNT).increment(1);
return;
}
}
}
}
}
Driver Class :
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class CountryIncomeConf {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Path inputPath = new Path(args[0]);
Path outputDir = new Path(args[1]);
// Create configuration
Configuration conf = new Configuration(true);
// Create job
Job job = new Job(conf, "CountryIncomeConf");
job.setJarByClass(CountryIncomeConf.class);
Counter counter =
job.getCounters().findCounter(COUNTERS.MISSING_FIELDS_RECORD_COUNT);
System.out.println("Error Counter = " + counter.getValue());
// Setup MapReduce
job.setMapperClass(CountryIncomeMapper.class);
job.setNumReduceTasks(1);
// Specify key / value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
// Input
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(TextOutputFormat.class);
// Delete output if exists
FileSystem hdfs = FileSystem.get(conf);
if (hdfs.exists(outputDir))
hdfs.delete(outputDir, true);
// Execute job
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
}
您似乎在提交作业之前尝试获取计数器。
我在sqoop导出的时候也遇到了同样的错误。
产生错误是因为 hdfs 目录是空的。
一旦我填充了目录(对应于一个配置单元 table),sqoop 运行 就没有问题了。
错误是:
Exception in thread "main" java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING
at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:294)
at org.apache.hadoop.mapreduce.Job.getCounters(Job.java:762)
at com.aamend.hadoop.MapReduce.CountryIncomeConf.main(CountryIncomeConf.java:41)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
报错说明问题出在这一行:
Counter counter = job.getCounters().findCounter(COUNTERS.MISSING_FIELDS_RECORD_COUNT);
我还有一个名为 COUNTERS 的枚举。
映射器:
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
public class CountryIncomeMapper extends Mapper<Object, Text, Text, DoubleWritable> {
private Logger logger = Logger.getLogger("FilterMapper");
private final int incomeIndex = 54;
private final int countryIndex = 0;
private final int lenIndex = 58;
String seperator = ",";
public void map(Object key, Text line, Context context) throws IOException,
InterruptedException {
if (line == null) {
logger.info("null found.");
context.getCounter(COUNTERS.ERROR_COUNT).increment(1);
return;
}
if (line.toString().contains(
"Adjusted net national income per capita (current US$)")) {
String[] recordSplits = line.toString().split(seperator);
logger.info("The data has been splitted.");
if (recordSplits.length == lenIndex) {
String countryName = recordSplits[countryIndex];
try {
double income = Double.parseDouble(recordSplits[incomeIndex]);
context.write(new Text(countryName), new DoubleWritable(income));
} catch (NumberFormatException nfe) {
logger.info("The value of income is in wrong format." + countryName);
context.getCounter(COUNTERS.MISSING_FIELDS_RECORD_COUNT).increment(1);
return;
}
}
}
}
}
Driver Class :
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class CountryIncomeConf {
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Path inputPath = new Path(args[0]);
Path outputDir = new Path(args[1]);
// Create configuration
Configuration conf = new Configuration(true);
// Create job
Job job = new Job(conf, "CountryIncomeConf");
job.setJarByClass(CountryIncomeConf.class);
Counter counter =
job.getCounters().findCounter(COUNTERS.MISSING_FIELDS_RECORD_COUNT);
System.out.println("Error Counter = " + counter.getValue());
// Setup MapReduce
job.setMapperClass(CountryIncomeMapper.class);
job.setNumReduceTasks(1);
// Specify key / value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
// Input
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
// Output
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(TextOutputFormat.class);
// Delete output if exists
FileSystem hdfs = FileSystem.get(conf);
if (hdfs.exists(outputDir))
hdfs.delete(outputDir, true);
// Execute job
int code = job.waitForCompletion(true) ? 0 : 1;
System.exit(code);
}
}
您似乎在提交作业之前尝试获取计数器。
我在sqoop导出的时候也遇到了同样的错误。 产生错误是因为 hdfs 目录是空的。 一旦我填充了目录(对应于一个配置单元 table),sqoop 运行 就没有问题了。