MAPREDUCE error: method write in interface TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> cannot be applied to given types

MAPREDUCE error: method write in interface TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> cannot be applied to given types

package br.edu.ufam.anibrata;

import java.io.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;
import java.util.Arrays;
import java.util.HashSet;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.kohsuke.args4j.ParserProperties;

import tl.lin.data.array.ArrayListWritable;
import tl.lin.data.pair.PairOfStringInt;
import tl.lin.data.pair.PairOfWritables;
import br.edu.ufam.data.Dataset;

import com.google.gson.JsonSyntaxException;

public class BuildIndexWebTables extends Configured implements Tool {

    private static final Logger LOG = Logger.getLogger(BuildIndexWebTables.class);

    public static void main(String[] args) throws Exception 
    {
        ToolRunner.run(new BuildIndexWebTables(), args);
    }

    @Override
    public int run(String[] argv) throws Exception {
        // Creates a new job configuration for this Hadoop job.
        Args args = new Args();
        CmdLineParser parser = new CmdLineParser(args, ParserProperties.defaults().withUsageWidth(100));

        try 
        {
            parser.parseArgument(argv);
        }
        catch (CmdLineException e) 
        {
            System.err.println(e.getMessage());
            parser.printUsage(System.err);
            return -1;
        }

        Configuration conf = getConf();

        conf.setBoolean("mapreduce.map.output.compress", true);
        conf.setBoolean("mapreduce.map.output.compress", true);
        conf.set("mapreduce.map.failures.maxpercent", "10");
        conf.set("mapreduce.max.map.failures.percent", "10");
        conf.set("mapred.max.map.failures.percent", "10");
        conf.set("mapred.map.failures.maxpercent", "10");

        conf.setBoolean("mapred.compress.map.output", true);
        conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec");
        conf.setBoolean("mapreduce.map.output.compress", true);

        /*String inputPrefixes = args[0];
        String outputFile = args[1];*/

        Job job = Job.getInstance(conf);

        /*FileInputFormat.addInputPath(job, new Path(inputPrefixes));
        FileOutputFormat.setOutputPath(job, new Path(outputFile));*/

        FileInputFormat.setInputPaths(job, new Path(args.input));
        FileOutputFormat.setOutputPath(job, new Path(args.output));

        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job,org.apache.hadoop.io.compress.GzipCodec.class);

        job.setMapperClass(BuildIndexWebTablesMapper.class);
        job.setReducerClass(BuildIndexWebTablesReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);     
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(PairOfWritables.class);
        //job.setOutputFormatClass(MapFileOutputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        /*job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);*/


        job.setJarByClass(BuildIndexWebTables.class);
        job.setNumReduceTasks(args.numReducers);
        //job.setNumReduceTasks(500);

        FileInputFormat.setInputPaths(job, new Path(args.input));
        FileOutputFormat.setOutputPath(job, new Path(args.output));

        System.out.println(Arrays.deepToString(FileInputFormat.getInputPaths(job)));

        // Delete the output directory if it exists already.
        Path outputDir = new Path(args.output);
        FileSystem.get(getConf()).delete(outputDir, true);

        long startTime = System.currentTimeMillis();
        job.waitForCompletion(true);
        System.out.println("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");

        return 0;
    }

    private BuildIndexWebTables() {}

    public static class Args 
    {
        @Option(name = "-input", metaVar = "[path]", required = true, usage = "input path")
        public String input;

        @Option(name = "-output", metaVar = "[path]", required = true, usage = "output path")
        public String output;

         @Option(name = "-reducers", metaVar = "[num]", required = false, usage = "number of reducers")
         public int numReducers = 1;
    }

    public static class BuildIndexWebTablesMapper extends Mapper<LongWritable, Text, Text, Text> {
        //public static final Log log = LogFactory.getLog(BuildIndexWebTablesMapper.class);
        private static final Text WORD = new Text();
        private static final Text OPVAL = new Text();

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            // Log to stdout file
            System.out.println("Map key : TEST");

            //log to the syslog file
            //log.info("Map key "+ key);

            /*if(log.isDebugEanbled()){
                log.debug("Map key "+ key);
            }*/

            Dataset ds;
            String pgTitle; // Table page title

            List<String> tokens = new ArrayList<String>(); // terms for frequency and other data

            ds = Dataset.fromJson(value.toString());    // Get all text values from the json corpus
            String[][] rel = ds.getRelation();          // Extract relation from the first json 
            int numCols = rel.length;                   // Number of columns in the relation
            String[] attributes = new String[numCols];  // To store attributes for the relation

            for (int j = 0; j < numCols; j++) {         // Attributes of the relation
                attributes[j] = rel[j][0];
            }

            int numRows = rel[0].length;                //Number of rows of the relation
            //dsTabNum = ds.getTableNum();              // Gets the table number from json

            // Reads terms from relation and stores in tokens
            for (int i = 0; i < numRows; i++ ){
                for (int j = 0; j < numCols; j++ ){
                    String w = rel[i][j].toLowerCase().replaceAll("(^[^a-z]+|[^a-z]+$)", "");
                    if (w.length() == 0) 
                        continue;
                    else {
                        w = w + "|" + pgTitle + "." + j + "|" + i; // Concatenate the term/PageTitle.Column number/row number in term   
                        tokens.add(w);
                    }
                }
            }

            // Emit postings.
            for (String token : tokens){
                String[] tokenPart = token.split("|", -2);      // Split based on "|", -2(any negative) to split multiple times.
                String newkey = tokenPart[0] + "|" + tokenPart[1];
                WORD.set(newkey);   // Emit term as key
                //String valstr = Arrays.toString(Arrays.copyOfRange(tokenPart, 2, tokenPart.length)); // Emit rest of the string as value 
                String valstr = tokenPart[2];
                OPVAL.set(valstr);
                context.write(WORD,OPVAL);
            }
        }

    }

    public static class BuildIndexWebTablesReducer extends Reducer<Text, Text, Text, Text> {

        private static final Text TERM = new Text();
        private static final IntWritable TF = new IntWritable();
        private String PrevTerm = null;
        private int termFrequency = 0;

        @Override
        protected void reduce(Text key, Iterable<Text> textval, Context context) throws IOException, InterruptedException {

            Iterator<Text> iter = textval.iterator();
            IntWritable tnum = new IntWritable();
            ArrayListWritable<IntWritable> postings = new ArrayListWritable<IntWritable>();
            PairOfStringInt relColInfo = new PairOfStringInt();
            PairOfWritables keyVal = new PairOfWritables<PairOfStringInt, ArrayListWritable<IntWritable>>();

            if((!key.toString().equals(PrevTerm)) && (PrevTerm != null)) {
                String[] parseKey = PrevTerm.split("|", -2);
                TERM.set(parseKey[0]);
                relColInfo.set(parseKey[1],termFrequency);
                keyVal.set(relColInfo, postings);
                context.write(TERM, keyVal);
                termFrequency = 0;
                postings.clear();
            }

            PrevTerm = key.toString();

            while (iter.hasNext()) {
                int tupleset = Integer.parseInt(iter.next().toString());
                tnum.set(tupleset);
                postings.add(tnum);
                termFrequency++;        
            }
        }
    }
}`

我在编译时遇到下面提到的错误。

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:2.3.2:compile (default-compile) on project projeto-final: Compilation failure [ERROR] /home/cloudera/topicosBD-pis/topicosBD-pis/projeto-final/src/main/java/br/edu/ufam/anibrata/BuildIndexWebTables.java:[278,11] error: method write in interface TaskInputOutputContext cannot be applied to given types;

发生这种情况的行是 "context.write(TERM, keyVal);"。不过,这段代码有一些依赖于我的本地机器。我被这个错误困住了,因为我对它一无所知。如果有人可以帮助我了解问题的根源以及如何解决这个问题。我对 hadoop / mapreduce 很陌生。 我尝试在 job.setOutputFormatClass(MapFileOutputFormat.class) 之间切换 OutputFormatClass;和 job.setOutputFormatClass(TextOutputFormat.class);,它们都抛出相同的错误。我正在使用 "mvn clean package" 编译。

非常感谢任何帮助。

提前致谢。

如我所见,您正试图在上下文中编写一个文本类型的键 (TERM) 和一个值 (keyval ) 类型为 PairOfWritables,但是你的 reducer class 使用 TEXT 类型的 VALUEOUT(最后一个)扩展 Reducer。您应该将 VALUEOUT 更改为正确的类型。

你的情况:

public static class BuildIndexWebTablesReducer extends Reducer<Text, Text, Text, PairOfWritables>