Hadoop 2.7.3 伪分布式模式下用于日志分析的 Map Reduce 作业不是 运行
Map Reduce job for log analysis not running in Hadoop 2.7.3 pseudo distributed mode
我是大数据世界的新手,分配了一个 POC 来处理 Web 应用程序生成的日志。我已经在 linux VM 上以伪分布式模式成功设置了 hadoop,并成功地使用 flume 将 Web 服务器日志从 windows 服务器注入到 hdfs。接下来我编写了一个用于日志分析的 mapreduce 程序,在 eclipse 中 运行 很好。但是当我导出 jar 并将其移动到 hadoop VM 时,作业成功完成,输出目录在 hdfs 中创建但 part-* 文件为空。我已经通过在本地日食上测试代码来验证我的输入数据集。我尝试了远程应用程序调试,main 方法中的断点命中但 map 方法中的断点没有。我们将不胜感激。
我已经在互联网上搜索了足够多但找不到类似的东西。
下面是我的代码
public class OneLoadTransactionsSuccessCount {
public static class OneLoadLogMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private static final Logger logger = Logger.getLogger(OneLoadLogMapper.class);
private final static String SUCCESS_CODE = "0";
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
SimpleDateFormat sdf = new SimpleDateFormat("hh:mm dd MMM, yyyy");
String text = value.toString();
logger.info("text:::: " + text);
System.out.println("text:::: " + text);
int startingTag = text.indexOf("[#");
int endingTag = text.indexOf("#]");
if (startingTag != -1 && endingTag != -1) {
try {
String completeLog = text.substring(startingTag + 1, endingTag);
logger.info("completeLog:::: " + completeLog);
System.out.println("completeLog:::: " + completeLog);
String[] tokens = completeLog.split("\|");
if (tokens != null && tokens.length > 0) {
logger.info(tokens[1]);
System.out.println(tokens[1]);
if (tokens[6] != null) {
String responseXML = tokens[6];
logger.info("responseXML:::: " + responseXML);
System.out.println("responseXML:::: " + responseXML);
JAXBContext jaxbContext = JAXBContext.newInstance(LoadResponseMsg.class);
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
StringReader reader = new StringReader(responseXML);
InputStream inputStream = new ByteArrayInputStream(
responseXML.getBytes(Charset.forName("UTF-8")));
TransformerFactory factory = TransformerFactory.newInstance();
Source xslt = new StreamSource(new File("removeNs.xslt"));
Transformer transformer = factory.newTransformer(xslt);
Source src = new StreamSource(inputStream);
transformer.transform(src, new StreamResult(new File("tempoutput.xml")));
File responseXMLFile = new File("tempoutput.xml");
jaxbUnmarshaller.setEventHandler(new ValidationEventHandler() {
@Override
public boolean handleEvent(ValidationEvent event) {
throw new RuntimeException(event.getMessage(), event.getLinkedException());
}
});
LoadResponseMsg loadResponseMsg = (LoadResponseMsg) jaxbUnmarshaller
.unmarshal(responseXMLFile);
if (loadResponseMsg != null) {
logger.info("reader:::: " + reader.toString());
System.out.println("reader:::: " + reader.toString());
if (loadResponseMsg.getResponseHeader().getResponseCode()
.equalsIgnoreCase(SUCCESS_CODE)) {
logger.info("status::: " + loadResponseMsg.getLoadResponse().getDescription());
System.out
.println("status::: " + loadResponseMsg.getLoadResponse().getDescription());
word.set(loadResponseMsg.getLoadResponse().getCompanyShortName());
context.write(word, one);
}
}
}
}
} catch (JAXBException e) {
logger.error(e);
} catch (IOException e) {
logger.error(e);
} catch (Exception e) {
logger.error(e);
}
}
}
}
public static class OneLoadLogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
try {
Configuration conf = new Configuration();
System.out.println("in main method");
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(OneLoadTransactionsSuccessCount.class);
job.setMapperClass(OneLoadLogMapper.class);
job.setCombinerClass(OneLoadLogReducer.class);
job.setReducerClass(OneLoadLogReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.out.println("about to run the job");
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
System.err.println(e);
e.printStackTrace();
}
}
}
所以涉及到一个IO操作。我在制作 jar 时将所需的文件添加到类路径,但我的开发环境是 windows 而我是 运行 它在 Linux 上。问题是因为不同的文件系统。该程序正在 Linux.
中的错误位置查找文件
我是大数据世界的新手,分配了一个 POC 来处理 Web 应用程序生成的日志。我已经在 linux VM 上以伪分布式模式成功设置了 hadoop,并成功地使用 flume 将 Web 服务器日志从 windows 服务器注入到 hdfs。接下来我编写了一个用于日志分析的 mapreduce 程序,在 eclipse 中 运行 很好。但是当我导出 jar 并将其移动到 hadoop VM 时,作业成功完成,输出目录在 hdfs 中创建但 part-* 文件为空。我已经通过在本地日食上测试代码来验证我的输入数据集。我尝试了远程应用程序调试,main 方法中的断点命中但 map 方法中的断点没有。我们将不胜感激。
我已经在互联网上搜索了足够多但找不到类似的东西。 下面是我的代码
public class OneLoadTransactionsSuccessCount {
public static class OneLoadLogMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private static final Logger logger = Logger.getLogger(OneLoadLogMapper.class);
private final static String SUCCESS_CODE = "0";
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
SimpleDateFormat sdf = new SimpleDateFormat("hh:mm dd MMM, yyyy");
String text = value.toString();
logger.info("text:::: " + text);
System.out.println("text:::: " + text);
int startingTag = text.indexOf("[#");
int endingTag = text.indexOf("#]");
if (startingTag != -1 && endingTag != -1) {
try {
String completeLog = text.substring(startingTag + 1, endingTag);
logger.info("completeLog:::: " + completeLog);
System.out.println("completeLog:::: " + completeLog);
String[] tokens = completeLog.split("\|");
if (tokens != null && tokens.length > 0) {
logger.info(tokens[1]);
System.out.println(tokens[1]);
if (tokens[6] != null) {
String responseXML = tokens[6];
logger.info("responseXML:::: " + responseXML);
System.out.println("responseXML:::: " + responseXML);
JAXBContext jaxbContext = JAXBContext.newInstance(LoadResponseMsg.class);
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
StringReader reader = new StringReader(responseXML);
InputStream inputStream = new ByteArrayInputStream(
responseXML.getBytes(Charset.forName("UTF-8")));
TransformerFactory factory = TransformerFactory.newInstance();
Source xslt = new StreamSource(new File("removeNs.xslt"));
Transformer transformer = factory.newTransformer(xslt);
Source src = new StreamSource(inputStream);
transformer.transform(src, new StreamResult(new File("tempoutput.xml")));
File responseXMLFile = new File("tempoutput.xml");
jaxbUnmarshaller.setEventHandler(new ValidationEventHandler() {
@Override
public boolean handleEvent(ValidationEvent event) {
throw new RuntimeException(event.getMessage(), event.getLinkedException());
}
});
LoadResponseMsg loadResponseMsg = (LoadResponseMsg) jaxbUnmarshaller
.unmarshal(responseXMLFile);
if (loadResponseMsg != null) {
logger.info("reader:::: " + reader.toString());
System.out.println("reader:::: " + reader.toString());
if (loadResponseMsg.getResponseHeader().getResponseCode()
.equalsIgnoreCase(SUCCESS_CODE)) {
logger.info("status::: " + loadResponseMsg.getLoadResponse().getDescription());
System.out
.println("status::: " + loadResponseMsg.getLoadResponse().getDescription());
word.set(loadResponseMsg.getLoadResponse().getCompanyShortName());
context.write(word, one);
}
}
}
}
} catch (JAXBException e) {
logger.error(e);
} catch (IOException e) {
logger.error(e);
} catch (Exception e) {
logger.error(e);
}
}
}
}
public static class OneLoadLogReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
try {
Configuration conf = new Configuration();
System.out.println("in main method");
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(OneLoadTransactionsSuccessCount.class);
job.setMapperClass(OneLoadLogMapper.class);
job.setCombinerClass(OneLoadLogReducer.class);
job.setReducerClass(OneLoadLogReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.out.println("about to run the job");
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
System.err.println(e);
e.printStackTrace();
}
}
}
所以涉及到一个IO操作。我在制作 jar 时将所需的文件添加到类路径,但我的开发环境是 windows 而我是 运行 它在 Linux 上。问题是因为不同的文件系统。该程序正在 Linux.
中的错误位置查找文件