Map Reduce 错误输出/Reducer 不工作

Map Reduce Wrong Output / Reducer not working

我正在尝试收集特定站点的最高和最低温度,然后找出不同日期的温度总和,但我一直在映射器中遇到错误,并且尝试了很多其他方法,例如使用 stringtokenizer但同样的事情,我得到一个错误。

示例输入。

站点日期 (YYYYMMDD) 元素温度 flag1 flat2 othervalue

我只需要输入站、日期(键)、元素和温度

USW00003889,20180101,TMAX,122,7,1700
USW00003889,20180101,TMIN,-67,7,1700
UK000056225,20180101,TOBS,56,7,1700
UK000056225,20180101,PRCP,0,7,1700
UK000056225,20180101,SNOW,0,7
USC00264341,20180101,SNWD,0,7,1700
USC00256837,20180101,PRCP,0,7,800
UK000056225,20180101,SNOW,0,7
UK000056225,20180101,SNWD,0,7,800
USW00003889,20180102,TMAX,12,E
USW00003889,20180102,TMIN,3,E
UK000056225,20180101,PRCP,42,E
SWE00138880,20180101,PRCP,50,E
UK000056225,20180101,PRCP,0,a
USC00256480,20180101,PRCP,0,7,700
USC00256480,20180101,SNOW,0,7
USC00256480,20180101,SNWD,0,7,700
SWE00138880,20180103,TMAX,-228,7,800
SWE00138880,20180103,TMIN,-328,7,800
USC00247342,20180101,PRCP,0,7,800
UK000056225,20180101,SNOW,0,7
SWE00137764,20180101,PRCP,63,E
UK000056225,20180101,SNWD,0,E
USW00003889,20180104,TMAX,-43,W
USW00003889,20180104,TMIN,-177,W
            public static class MaxMinMapper
                 extends Mapper<Object, Text, Text, IntWritable> {

               private Text newDate = new Text(); 

               public void map(Object key, Text value, Context context) throws 
                    IOException, 
                     InterruptedException {


                String stationID = "USW00003889";
                String[] tokens = value.toString().split(",");
                String station = "";
                String date = "";
                String element = "";
                int data = 0;

                station = tokens[0];
                date = tokens[1];
                element = tokens[2];
                data = Integer.parseInt(tokens[3]);


                if (stationID.equals(station) && ( element.equals("TMAX") || 
                       element.equals("TMIN")) ) {

                    newDate.set(date);
                    context.write(newDate, new IntWritable(data));

                     }

                 }


              }

        public static class MaxMinReducer
            extends Reducer<Text, Text, Text, IntWritable> {

             private IntWritable result = new IntWritable();
            public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

                int sumResult = 0;
                int val1 = 0;
                int val2 = 0;

                while (values.iterator().hasNext()) {

                        val1 = values.iterator().next().get();
                        val2 = values.iterator().next().get();
                        sumResult = val1 + val2;

                    }

                    result.set(sumResult);


                context.write(key, result);

                }
            }

        }

请帮帮我,谢谢。

更新:使用条件验证每一行并将数据变量更改为字符串(稍后更改回 Integer -> IntWritable)。

            if (tokens.length <= 5) {
                station = tokens[0];
                date = tokens[1];
                element = tokens[2];
                data = tokens[3];
                otherValue = tokens[4];
            }else{
                station = tokens[0];
                date = tokens[1];
                element = tokens[2];
                data = tokens[3];
                otherValue = tokens[4];
                otherValue2 = tokens[5];
            }

更新 2:好的,我现在正在将输出写入文件,但它是错误的输出。我需要它来添加具有相同日期(键)的两个值我做错了什么?

OUTPUT:

20180101    -67
20180101    122
20180102    3
20180102    12
20180104    -177
20180104    -43
Desired Output
20180101    55
20180102    15
20180104    -220

这也是我收到的错误,即使我得到了输出。

    ERROR: (gcloud.dataproc.jobs.submit.hadoop) Job [8e31c44ccd394017a4a28b3b16471aca] failed with error:
Google Cloud Dataproc Agent reports job failure. If logs are available, they can be found at 'https://console.cloud.google.com/dataproc/jobs/8e31c44ccd394017a4a28b3b16471aca
?project=driven-airway-257512&region=us-central1' and in 'gs://dataproc-261a376e-7874-4151-b6b7-566c18758206-us-central1/google-cloud-dataproc-metainfo/f912a2f0-107f-40b6-94
56-b6a72cc8bfc4/jobs/8e31c44ccd394017a4a28b3b16471aca/driveroutput'.
    19/11/14 12:53:24 INFO client.RMProxy: Connecting to ResourceManager at cluster-1e8f-m/10.128.0.12:8032
19/11/14 12:53:25 INFO client.AHSProxy: Connecting to Application History server at cluster-1e8f-m/10.128.0.12:10200
19/11/14 12:53:26 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
19/11/14 12:53:26 INFO input.FileInputFormat: Total input files to process : 1
19/11/14 12:53:26 INFO mapreduce.JobSubmitter: number of splits:1
19/11/14 12:53:26 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/11/14 12:53:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1573654432484_0035
19/11/14 12:53:27 INFO impl.YarnClientImpl: Submitted application application_1573654432484_0035
19/11/14 12:53:27 INFO mapreduce.Job: The url to track the job: http://cluster-1e8f-m:8088/proxy/application_1573654432484_0035/
19/11/14 12:53:27 INFO mapreduce.Job: Running job: job_1573654432484_0035
19/11/14 12:53:35 INFO mapreduce.Job: Job job_1573654432484_0035 running in uber mode : false
19/11/14 12:53:35 INFO mapreduce.Job:  map 0% reduce 0%
19/11/14 12:53:41 INFO mapreduce.Job:  map 100% reduce 0%
19/11/14 12:53:52 INFO mapreduce.Job:  map 100% reduce 20%
19/11/14 12:53:53 INFO mapreduce.Job:  map 100% reduce 40%
19/11/14 12:53:54 INFO mapreduce.Job:  map 100% reduce 60%
19/11/14 12:53:56 INFO mapreduce.Job:  map 100% reduce 80%
19/11/14 12:53:57 INFO mapreduce.Job:  map 100% reduce 100%
19/11/14 12:53:58 INFO mapreduce.Job: Job job_1573654432484_0035 completed successfully
19/11/14 12:53:58 INFO mapreduce.Job: Counters: 55
    File System Counters
        FILE: Number of bytes read=120
        FILE: Number of bytes written=1247665
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        GS: Number of bytes read=846
        GS: Number of bytes written=76
        GS: Number of read operations=0
        GS: Number of large read operations=0
        GS: Number of write operations=0
        HDFS: Number of bytes read=139
        HDFS: Number of bytes written=0
        HDFS: Number of read operations=1
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=0
    Job Counters 
        Killed reduce tasks=1
        Launched map tasks=1
        Launched reduce tasks=5
        Rack-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=17348
        Total time spent by all reduces in occupied slots (ms)=195920
        Total time spent by all map tasks (ms)=4337
        Total time spent by all reduce tasks (ms)=48980
        Total vcore-milliseconds taken by all map tasks=4337
        Total vcore-milliseconds taken by all reduce tasks=48980
        Total megabyte-milliseconds taken by all map tasks=8882176
        Total megabyte-milliseconds taken by all reduce tasks=100311040
    Map-Reduce Framework
        Map input records=25
        Map output records=6
        Map output bytes=78
        Map output materialized bytes=120
        Input split bytes=139
        Combine input records=0
        Combine output records=0
        Reduce input groups=3
        Reduce shuffle bytes=120
        Reduce input records=6
        Reduce output records=6
        Spilled Records=12
        Shuffled Maps =5
        Failed Shuffles=0
        Merged Map outputs=5
        GC time elapsed (ms)=1409
        CPU time spent (ms)=6350
        Physical memory (bytes) snapshot=1900220416
        Virtual memory (bytes) snapshot=21124952064
        Total committed heap usage (bytes)=1492123648
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=846
    File Output Format Counters 
        Bytes Written=76
Job output is complete

更新 3:

我更新了 Reducer(根据 LowKey 所说),它给我的输出与上面相同。它没有做我想要它做的加法。它完全忽略了该操作。为什么?

    public static class MaxMinReducer
            extends Reducer<Text, Text, Text, IntWritable> {

             public IntWritable result = new IntWritable();

             public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

                int value = 0;
                int sumResult = 0;
                Iterator<IntWritable> iterator = values.iterator();

                while (values.iterator().hasNext()) {

                    value = iterator.next().get();

                        sumResult = sumResult + value;

                }   

                result.set(sumResult);
                context.write(key, result);
            }

        }

更新 4:添加我的导入和驱动程序 class 以找出为什么我的减速器不会 运行?

package mapreduceprogram;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
        public static void main(String[] args) throws Exception {

            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "tempmin");
            job.setJarByClass(TempMin.class);
            job.setMapperClass(MaxMinMapper.class);
            job.setReducerClass(MaxMinReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[1]));
            FileOutputFormat.setOutputPath(job, new Path (args[2]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }

    }

有什么问题吗,为什么我的减速器 class 不是 运行ning?

这些列是否由制表符分隔? 如果是,那么不要指望在那里找到 space 字符。

你做错了什么?嗯,一方面,你为什么有:

final int missing = -9999;

这没有任何意义。

在此之下,您有一些代码显然应该添加两个值,但您似乎不小心从列表中丢弃了项目。查看您拥有的位置:

if (values.iterator().next().get() != missing)

好吧...你从未保存过这个值,所以这意味着你把它扔掉了。

另一个问题是您添加不正确...出于某种原因,您试图为循环的每次迭代添加两个值。你应该添加一个,所以你的循环应该是这样的:

IntWritable value = null;
Iterator iterator = values.iterator();
while (values.iterator().hasNext()) {
  value = iterator.next().get();
  if (value != missing){
    sumResult = sumResult + value;
  }
}

下一个明显的问题是您将输出行放在 while 循环中:

while (values.iterator().hasNext()) {
  [...]
  context.write(key, result);
}

这意味着每次将一个项目读入 reducer 时,都会写出一个项目。我想你要做的是读入给定键的所有项目,然后写一个减少的值(总和)。在那种情况下,您不应该在循环中输出。应该是在.

之后
while ([...]) {
  [...]
}

result.set(sumResult);
context.write(key, result);