当 运行 通过终端时,Hadoop 作业不映射任何输出,但它在 eclipse 中工作正常

Hadoop job not mapping any output when run through terminal but it works fine in eclipse

我这里有一段代码。

import java.io.IOException;
import java.util.ArrayList;
import javax.xml.xpath.XPathConstants;
import javax.xml.xpath.XPathExpression;
import javax.xml.xpath.XPathFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.xerces.parsers.DOMParser;
import org.w3c.dom.Document;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;


public class XMLParser {    

public static ArrayList<String> a= new ArrayList<>();
public static XPathFactory xpf= XPathFactory.newInstance();
public static javax.xml.xpath.XPath xp= xpf.newXPath();
public static XPathExpression xpe;

/**
 * Reads records that are delimited by a specific begin/end tag.
 */
public static class XmlInputFormat extends TextInputFormat {

    public static final String START_TAG_KEY = "xmlinput.start";
    public static final String END_TAG_KEY = "xmlinput.end";


    public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
        XmlRecordReader MyXmlReader = new XmlRecordReader();
        try {
            MyXmlReader.initialize(split, context);
        } catch (IOException | InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return MyXmlReader;
    }

    /**
     * XMLRecordReader class to read through a given xml document to output
     * xml blocks as records as specified by the start tag and end tag
     *
     */

    public static class XmlRecordReader extends RecordReader<LongWritable, Text> {
        private byte[] startTag;
        private byte[] endTag;
        private long start;
        private long end;
        private FSDataInputStream fsin;         
        private DataOutputBuffer buffer = new DataOutputBuffer();
        private int j=0;    //pointer to step back 

        private LongWritable key = new LongWritable();
        private Text value = new Text();
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
            endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
            FileSplit fileSplit = (FileSplit) split;

            // open the file and seek to the start of the split
            start = fileSplit.getStart();
            end = start + fileSplit.getLength();
            Path file = fileSplit.getPath();
            FileSystem fs = file.getFileSystem(conf);
            fsin = fs.open(fileSplit.getPath());
            fsin.seek(start);               

        }
        @Override
        public boolean nextKeyValue() throws IOException,
        InterruptedException {
            if (fsin.getPos() < end) {
                if (readUntilMatch(startTag, false)) {
                    try {
                        fsin.seek(fsin.getPos()-j);
                        if (readUntilMatch(endTag, true)) {
                            key.set(fsin.getPos());
                            value.set(buffer.getData(), 0,
                                    buffer.getLength());
                            return true;
                        }
                    } finally {
                        buffer.reset();
                    }
                }
            }
            return false;
        }
        @Override
        public LongWritable getCurrentKey() throws IOException,
        InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException,
        InterruptedException {
            return value;
        }
        @Override
        public void close() throws IOException {
            fsin.close();
        }
        @Override
        public float getProgress() throws IOException {
            return (fsin.getPos() - start) / (float) (end - start);
        }

        private boolean readUntilMatch(byte[] match, boolean withinBlock)
                throws IOException {
            int i = 0;
            while (true) {
                int b = fsin.read();
                // end of file:
                if (b == -1)
                    return false;                   
                // save to buffer:
                if (withinBlock)
                    buffer.write(b);
                // check if we're matching:
                if (b == match[i]) {
                    i++;                        
                    if(i >= match.length){
                        if(!withinBlock) j=i;
                        return true;                        
                    }
                } 
                else if(i>0 && b==0x20){
                    if(!withinBlock) j=i+1; //since the last char wasn't a match, so step back i+1 instead of i chars.
                    return true;
                }
                else i = 0;
                // see if we've passed the stop point:
                if (!withinBlock && i == 0 && fsin.getPos() >= end)
                    return false;
            }
        }
    }
}


public static class Map extends Mapper<LongWritable, Text,Text, Text> { 

    @SuppressWarnings({ "unchecked", "rawtypes" })
    @Override
    protected void map(LongWritable key, Text value,Mapper.Context context)
            throws
            IOException, InterruptedException {
        String document = value.toString();

        try {                       

            DOMParser parser = new DOMParser();                     

            parser.parse(new InputSource(new java.io.StringReader(document)));
            Document dom = parser.getDocument();

            xpe= xp.compile("//TITLE");
            Node x= (Node) xpe.evaluate(dom, XPathConstants.NODE);

            for(int i=0; i<a.size(); i++){                  
                xpe= xp.compile("//"+a.get(i));
                Node n= (Node) xpe.evaluate(dom, XPathConstants.NODE);
                String a= x.getTextContent();
                String b= i+"\""+traverse(n)+"\"";
                context.write(new Text(a), new Text(b));
            }   


        }
        catch(Exception e){
            throw new IOException(e);
        }
    }

    public static String traverse(Node item) {
        // traversal 
        int x= item.getChildNodes().getLength();
        String s="";
        if(x>1){    //skipping the first #text node
            NodeList l= item.getChildNodes();           
            for(int i=1; i<l.getLength(); i+=2){    //skipping every alternate #text node
                Node n= l.item(i);
                if(i>1)
                    s+="|";
                //if(n.getTextContent().equals("//s+")) continue;
                s+=traverse(n);
            }           
        }

        else{ 
            NamedNodeMap m= item.getAttributes();
            for(int i=0; m!=null && i<m.getLength(); i++){
                if(i>0) s+="|";
                s+= m.item(i).getTextContent().trim();
            }
            s+= item.getTextContent().trim();
        }

        return s;

    }

}

public static class Reduce
extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void setup(Context context)
            throws IOException, InterruptedException {          
        StringBuilder sb= new StringBuilder();
        int i=0;
        for(String s: a){
            if(i>0)
                sb.append(",");
            sb.append(s);
            i++;
        }

        context.write(new Text(sb.toString()), null);

    }
    @Override
    protected void cleanup( Context context)
            throws IOException, InterruptedException {
        context.write(null, null);
    }       

    @Override
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {      

        String[] arr= new String[a.size()];
        for(Text value : values){
            String s= value.toString();
            int j= s.indexOf('\"');
            String x= s.substring(0,j);
            int i= Integer.parseInt(x);
            arr[i]= s.substring(j);             
        }

        StringBuilder sb= new StringBuilder();

        for(int i=0; i<arr.length; i++)
            sb.append(arr[i]).append(",");

        context.write(null,new Text(sb.toString()));

    } 
}



@SuppressWarnings("deprecation")
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
    // TODO Auto-generated method stub      
    a.add("TITLE");
    a.add("ARTIST");
    a.add("COUNTRY");
    a.add("PRICE");

    Configuration conf = new Configuration();

    conf.set("xmlinput.start", "<CD>");
    conf.set("xmlinput.end", "</CD>");

    Job job = new Job(conf);
    job.setJarByClass(XMLParser.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setMapperClass(XMLParser.Map.class);
    job.setReducerClass(XMLParser.Reduce.class);
    job.setInputFormatClass(XmlInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    Path outPath= new Path(args[1]);
    FileOutputFormat.setOutputPath(job, outPath);
    FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
    if (dfs.exists(outPath)) {
        dfs.delete(outPath, true);
    }
    job.waitForCompletion(true);

}
}

代码基本上解析任何给定的 XML 文件,并将其转换为 csv 格式。它是使用 map-reduce 范式编写的。在这种情况下,我正在输入一个特定的 xml 文件,其中包含如下内容:

<CATALOG>
<CD>
    <TITLE>Empire Burlesque</TITLE>
    <ARTIST>Bob Dylan</ARTIST>
    <COUNTRY>USA</COUNTRY>
    <COMPANY>Columbia</COMPANY>
    <PRICE>10.90</PRICE>
    <YEAR>1985</YEAR>
</CD>
<CD>
    <TITLE>Hide your heart</TITLE>
    <ARTIST>Bonnie Tyler</ARTIST>
    <COUNTRY>UK</COUNTRY>
    <COMPANY>CBS Records</COMPANY>
    <PRICE>9.90</PRICE>
    <YEAR>1988</YEAR>
</CD>
<CD>
    <TITLE>Greatest Hits</TITLE>
    <ARTIST>Dolly Parton</ARTIST>
    <COUNTRY>USA</COUNTRY>
    <COMPANY>RCA</COMPANY>
    <PRICE>9.90</PRICE>
    <YEAR>1982</YEAR>
</CD>
<CD>
    <TITLE>Still got the blues</TITLE>
    <ARTIST>Gary Moore</ARTIST>
    <COUNTRY>UK</COUNTRY>
    <COMPANY>Virgin records</COMPANY>
    <PRICE>10.20</PRICE>
    <YEAR>1990</YEAR>
</CD>
<CD>
    <TITLE>Eros</TITLE>
    <ARTIST>Eros Ramazzotti</ARTIST>
    <COUNTRY>EU</COUNTRY>
    <COMPANY>BMG</COMPANY>
    <PRICE>9.90</PRICE>
    <YEAR>1997</YEAR>
</CD>
<CD>
    <TITLE>One night only</TITLE>
    <ARTIST>Bee Gees</ARTIST>
    <COUNTRY>UK</COUNTRY>
    <COMPANY>Polydor</COMPANY>
    <PRICE>10.90</PRICE>
    <YEAR>1998</YEAR>
</CD>
<CD>
    <TITLE>Sylvias Mother</TITLE>
    <ARTIST>Dr.Hook</ARTIST>
    <COUNTRY>UK</COUNTRY>
    <COMPANY>CBS</COMPANY>
    <PRICE>8.10</PRICE>
    <YEAR>1973</YEAR>
</CD>
<CD>
    <TITLE>Maggie May</TITLE>
    <ARTIST>Rod Stewart</ARTIST>
    <COUNTRY>UK</COUNTRY>
    <COMPANY>Pickwick</COMPANY>
    <PRICE>8.50</PRICE>
    <YEAR>1990</YEAR>
</CD>
<CD>
    <TITLE>Romanza</TITLE>
    <ARTIST>Andrea Bocelli</ARTIST>
    <COUNTRY>EU</COUNTRY>
    <COMPANY>Polydor</COMPANY>
    <PRICE>10.80</PRICE>
    <YEAR>1996</YEAR>
</CD>
<CD>
    <TITLE>When a man loves a woman</TITLE>
    <ARTIST>Percy Sledge</ARTIST>
    <COUNTRY>USA</COUNTRY>
    <COMPANY>Atlantic</COMPANY>
    <PRICE>8.70</PRICE>
    <YEAR>1987</YEAR>
</CD>
<CD>
    <TITLE>Black angel</TITLE>
    <ARTIST>Savage Rose</ARTIST>
    <COUNTRY>EU</COUNTRY>
    <COMPANY>Mega</COMPANY>
    <PRICE>10.90</PRICE>
    <YEAR>1995</YEAR>
</CD>
<CD>
    <TITLE>1999 Grammy Nominees</TITLE>
    <ARTIST>Many</ARTIST>
    <COUNTRY>USA</COUNTRY>
    <COMPANY>Grammy</COMPANY>
    <PRICE>10.20</PRICE>
    <YEAR>1999</YEAR>
</CD>
<CD>
    <TITLE>For the good times</TITLE>
    <ARTIST>Kenny Rogers</ARTIST>
    <COUNTRY>UK</COUNTRY>
    <COMPANY>Mucik Master</COMPANY>
    <PRICE>8.70</PRICE>
    <YEAR>1995</YEAR>
</CD>
<CD>
    <TITLE>Big Willie style</TITLE>
    <ARTIST>Will Smith</ARTIST>
    <COUNTRY>USA</COUNTRY>
    <COMPANY>Columbia</COMPANY>
    <PRICE>9.90</PRICE>
    <YEAR>1997</YEAR>
</CD>
<CD>
    <TITLE>Tupelo Honey</TITLE>
    <ARTIST>Van Morrison</ARTIST>
    <COUNTRY>UK</COUNTRY>
    <COMPANY>Polydor</COMPANY>
    <PRICE>8.20</PRICE>
    <YEAR>1971</YEAR>
</CD>
<CD>
    <TITLE>Soulsville</TITLE>
    <ARTIST>Jorn Hoel</ARTIST>
    <COUNTRY>Norway</COUNTRY>
    <COMPANY>WEA</COMPANY>
    <PRICE>7.90</PRICE>
    <YEAR>1996</YEAR>
</CD>
<CD>
    <TITLE>The very best of</TITLE>
    <ARTIST>Cat Stevens</ARTIST>
    <COUNTRY>UK</COUNTRY>
    <COMPANY>Island</COMPANY>
    <PRICE>8.90</PRICE>
    <YEAR>1990</YEAR>
</CD>
<CD>
    <TITLE>Stop</TITLE>
    <ARTIST>Sam Brown</ARTIST>
    <COUNTRY>UK</COUNTRY>
    <COMPANY>A and M</COMPANY>
    <PRICE>8.90</PRICE>
    <YEAR>1988</YEAR>
</CD>
<CD>
    <TITLE>Bridge of Spies</TITLE>
    <ARTIST>T'Pau</ARTIST>
    <COUNTRY>UK</COUNTRY>
    <COMPANY>Siren</COMPANY>
    <PRICE>7.90</PRICE>
    <YEAR>1987</YEAR>
</CD>
<CD>
    <TITLE>Private Dancer</TITLE>
    <ARTIST>Tina Turner</ARTIST>
    <COUNTRY>UK</COUNTRY>
    <COMPANY>Capitol</COMPANY>
    <PRICE>8.90</PRICE>
    <YEAR>1983</YEAR>
</CD>
<CD>
    <TITLE>Midt om natten</TITLE>
    <ARTIST>Kim Larsen</ARTIST>
    <COUNTRY>EU</COUNTRY>
    <COMPANY>Medley</COMPANY>
    <PRICE>7.80</PRICE>
    <YEAR>1983</YEAR>
</CD>
<CD>
    <TITLE>Pavarotti Gala Concert</TITLE>
    <ARTIST>Luciano Pavarotti</ARTIST>
    <COUNTRY>UK</COUNTRY>
    <COMPANY>DECCA</COMPANY>
    <PRICE>9.90</PRICE>
    <YEAR>1991</YEAR>
</CD>
<CD>
    <TITLE>The dock of the bay</TITLE>
    <ARTIST>Otis Redding</ARTIST>
    <COUNTRY>USA</COUNTRY>
    <COMPANY>Atlantic</COMPANY>
    <PRICE>7.90</PRICE>
    <YEAR>1987</YEAR>
</CD>
<CD>
    <TITLE>Picture book</TITLE>
    <ARTIST>Simply Red</ARTIST>
    <COUNTRY>EU</COUNTRY>
    <COMPANY>Elektra</COMPANY>
    <PRICE>7.20</PRICE>
    <YEAR>1985</YEAR>
</CD>
<CD>
    <TITLE>Red</TITLE>
    <ARTIST>The Communards</ARTIST>
    <COUNTRY>UK</COUNTRY>
    <COMPANY>London</COMPANY>
    <PRICE>7.80</PRICE>
    <YEAR>1987</YEAR>
</CD>
<CD>
    <TITLE>Unchain my heart</TITLE>
    <ARTIST>Joe Cocker</ARTIST>
    <COUNTRY>USA</COUNTRY>
    <COMPANY>EMI</COMPANY>
    <PRICE>8.20</PRICE>
    <YEAR>1987</YEAR>
</CD>

当我在 eclipse 中 运行 时,日志如下:

File System Counters
    FILE: Number of bytes read=15670
    FILE: Number of bytes written=557909
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
Map-Reduce Framework
    Map input records=26
    Map output records=104
    Map output bytes=2760
    Map output materialized bytes=2974
    Input split bytes=120
    Combine input records=0
    Combine output records=0
    Reduce input groups=26
    Reduce shuffle bytes=2974
    Reduce input records=104
    Reduce output records=28
    Spilled Records=208
    Shuffled Maps =1
    Failed Shuffles=0
    Merged Map outputs=1
    GC time elapsed (ms)=122
    CPU time spent (ms)=0
    Physical memory (bytes) snapshot=0
    Virtual memory (bytes) snapshot=0
    Total committed heap usage (bytes)=273162240
Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
File Input Format Counters 
    Bytes Read=4669
File Output Format Counters 
    Bytes Written=1193

这是输出:

TITLE,ARTIST,COUNTRY,PRICE
"1999 Grammy Nominees","Many","USA","10.20",
"Big Willie style","Will Smith","USA","9.90",
"Black angel","Savage Rose","EU","10.90",
"Bridge of Spies","T'Pau","UK","7.90",
"Empire Burlesque","Bob Dylan","USA","10.90",
"Eros","Eros Ramazzotti","EU","9.90",
"For the good times","Kenny Rogers","UK","8.70",
"Greatest Hits","Dolly Parton","USA","9.90",
"Hide your heart","Bonnie Tyler","UK","9.90",
"Maggie May","Rod Stewart","UK","8.50",
"Midt om natten","Kim Larsen","EU","7.80",
"One night only","Bee Gees","UK","10.90",
"Pavarotti Gala Concert","Luciano Pavarotti","UK","9.90",
"Picture book","Simply Red","EU","7.20",
"Private Dancer","Tina Turner","UK","8.90",
"Red","The Communards","UK","7.80",
"Romanza","Andrea Bocelli","EU","10.80",
"Soulsville","Jorn Hoel","Norway","7.90",
"Still got the blues","Gary Moore","UK","10.20",
"Stop","Sam Brown","UK","8.90",
"Sylvias Mother","Dr.Hook","UK","8.10",
"The dock of the bay","Otis Redding","USA","7.90",
"The very best of","Cat Stevens","UK","8.90",
"Tupelo Honey","Van Morrison","UK","8.20",
"Unchain my heart","Joe Cocker","USA","8.20",
"When a man loves a woman","Percy Sledge","USA","8.70",

如你所见,这里一切都很好。我还在输出文件夹中看到了所需的结果。不过这里有一个问题:当我在终端上使用以下使用 jar 文件的命令 运行 时:

hadoop jar ./jobs/xmlParser.jar XMLParser cd.xml output

我得到以下结果:

File System Counters
    FILE: Number of bytes read=6
    FILE: Number of bytes written=212423
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=4881
    HDFS: Number of bytes written=1
    HDFS: Number of read operations=7
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=2
Job Counters 
    Launched map tasks=1
    Launched reduce tasks=1
    Data-local map tasks=1
    Total time spent by all maps in occupied slots (ms)=6709
    Total time spent by all reduces in occupied slots (ms)=7079
    Total time spent by all map tasks (ms)=6709
    Total time spent by all reduce tasks (ms)=7079
    Total vcore-seconds taken by all map tasks=6709
    Total vcore-seconds taken by all reduce tasks=7079
    Total megabyte-seconds taken by all map tasks=6870016
    Total megabyte-seconds taken by all reduce tasks=7248896
Map-Reduce Framework
    Map input records=26
    Map output records=0
    Map output bytes=0
    Map output materialized bytes=6
    Input split bytes=108
    Combine input records=0
    Combine output records=0
    Reduce input groups=0
    Reduce shuffle bytes=6
    Reduce input records=0
    Reduce output records=2
    Spilled Records=0
    Shuffled Maps =1
    Failed Shuffles=0
    Merged Map outputs=1
    GC time elapsed (ms)=167
    CPU time spent (ms)=2250
    Physical memory (bytes) snapshot=329895936
    Virtual memory (bytes) snapshot=1601593344
    Total committed heap usage (bytes)=168235008
Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
File Input Format Counters 
    Bytes Read=4773
File Output Format Counters 
    Bytes Written=1

当我检查输出时,我看到一个空文本文件。 我还按照@Chris Gerkin 在此页面上提到的步骤进行操作: Empty output file generated after running hadoop job。但它没有用。 谁能帮我解决这个问题?谢谢。

尤里卡!我发现了错误。原来问题出在我使用的全局静态 ArrayList a 上。在伪分布式/分布式模式下,该全局变量对执行 mapping/reducing 作业的数据节点不可见。这就是为什么它不会输出任何东西。