Map Reduce 错误输出/Reducer 不工作
Map Reduce Wrong Output / Reducer not working
我正在尝试收集特定站点的最高和最低温度,然后找出不同日期的温度总和,但我一直在映射器中遇到错误,并且尝试了很多其他方法,例如使用 stringtokenizer但同样的事情,我得到一个错误。
示例输入。
站点日期 (YYYYMMDD) 元素温度 flag1 flat2 othervalue
我只需要输入站、日期(键)、元素和温度
USW00003889,20180101,TMAX,122,7,1700
USW00003889,20180101,TMIN,-67,7,1700
UK000056225,20180101,TOBS,56,7,1700
UK000056225,20180101,PRCP,0,7,1700
UK000056225,20180101,SNOW,0,7
USC00264341,20180101,SNWD,0,7,1700
USC00256837,20180101,PRCP,0,7,800
UK000056225,20180101,SNOW,0,7
UK000056225,20180101,SNWD,0,7,800
USW00003889,20180102,TMAX,12,E
USW00003889,20180102,TMIN,3,E
UK000056225,20180101,PRCP,42,E
SWE00138880,20180101,PRCP,50,E
UK000056225,20180101,PRCP,0,a
USC00256480,20180101,PRCP,0,7,700
USC00256480,20180101,SNOW,0,7
USC00256480,20180101,SNWD,0,7,700
SWE00138880,20180103,TMAX,-228,7,800
SWE00138880,20180103,TMIN,-328,7,800
USC00247342,20180101,PRCP,0,7,800
UK000056225,20180101,SNOW,0,7
SWE00137764,20180101,PRCP,63,E
UK000056225,20180101,SNWD,0,E
USW00003889,20180104,TMAX,-43,W
USW00003889,20180104,TMIN,-177,W
public static class MaxMinMapper
extends Mapper<Object, Text, Text, IntWritable> {
private Text newDate = new Text();
public void map(Object key, Text value, Context context) throws
IOException,
InterruptedException {
String stationID = "USW00003889";
String[] tokens = value.toString().split(",");
String station = "";
String date = "";
String element = "";
int data = 0;
station = tokens[0];
date = tokens[1];
element = tokens[2];
data = Integer.parseInt(tokens[3]);
if (stationID.equals(station) && ( element.equals("TMAX") ||
element.equals("TMIN")) ) {
newDate.set(date);
context.write(newDate, new IntWritable(data));
}
}
}
public static class MaxMinReducer
extends Reducer<Text, Text, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sumResult = 0;
int val1 = 0;
int val2 = 0;
while (values.iterator().hasNext()) {
val1 = values.iterator().next().get();
val2 = values.iterator().next().get();
sumResult = val1 + val2;
}
result.set(sumResult);
context.write(key, result);
}
}
}
请帮帮我,谢谢。
更新:使用条件验证每一行并将数据变量更改为字符串(稍后更改回 Integer -> IntWritable)。
if (tokens.length <= 5) {
station = tokens[0];
date = tokens[1];
element = tokens[2];
data = tokens[3];
otherValue = tokens[4];
}else{
station = tokens[0];
date = tokens[1];
element = tokens[2];
data = tokens[3];
otherValue = tokens[4];
otherValue2 = tokens[5];
}
更新 2:好的,我现在正在将输出写入文件,但它是错误的输出。我需要它来添加具有相同日期(键)的两个值我做错了什么?
OUTPUT:
20180101 -67
20180101 122
20180102 3
20180102 12
20180104 -177
20180104 -43
Desired Output
20180101 55
20180102 15
20180104 -220
这也是我收到的错误,即使我得到了输出。
ERROR: (gcloud.dataproc.jobs.submit.hadoop) Job [8e31c44ccd394017a4a28b3b16471aca] failed with error:
Google Cloud Dataproc Agent reports job failure. If logs are available, they can be found at 'https://console.cloud.google.com/dataproc/jobs/8e31c44ccd394017a4a28b3b16471aca
?project=driven-airway-257512®ion=us-central1' and in 'gs://dataproc-261a376e-7874-4151-b6b7-566c18758206-us-central1/google-cloud-dataproc-metainfo/f912a2f0-107f-40b6-94
56-b6a72cc8bfc4/jobs/8e31c44ccd394017a4a28b3b16471aca/driveroutput'.
19/11/14 12:53:24 INFO client.RMProxy: Connecting to ResourceManager at cluster-1e8f-m/10.128.0.12:8032
19/11/14 12:53:25 INFO client.AHSProxy: Connecting to Application History server at cluster-1e8f-m/10.128.0.12:10200
19/11/14 12:53:26 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
19/11/14 12:53:26 INFO input.FileInputFormat: Total input files to process : 1
19/11/14 12:53:26 INFO mapreduce.JobSubmitter: number of splits:1
19/11/14 12:53:26 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/11/14 12:53:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1573654432484_0035
19/11/14 12:53:27 INFO impl.YarnClientImpl: Submitted application application_1573654432484_0035
19/11/14 12:53:27 INFO mapreduce.Job: The url to track the job: http://cluster-1e8f-m:8088/proxy/application_1573654432484_0035/
19/11/14 12:53:27 INFO mapreduce.Job: Running job: job_1573654432484_0035
19/11/14 12:53:35 INFO mapreduce.Job: Job job_1573654432484_0035 running in uber mode : false
19/11/14 12:53:35 INFO mapreduce.Job: map 0% reduce 0%
19/11/14 12:53:41 INFO mapreduce.Job: map 100% reduce 0%
19/11/14 12:53:52 INFO mapreduce.Job: map 100% reduce 20%
19/11/14 12:53:53 INFO mapreduce.Job: map 100% reduce 40%
19/11/14 12:53:54 INFO mapreduce.Job: map 100% reduce 60%
19/11/14 12:53:56 INFO mapreduce.Job: map 100% reduce 80%
19/11/14 12:53:57 INFO mapreduce.Job: map 100% reduce 100%
19/11/14 12:53:58 INFO mapreduce.Job: Job job_1573654432484_0035 completed successfully
19/11/14 12:53:58 INFO mapreduce.Job: Counters: 55
File System Counters
FILE: Number of bytes read=120
FILE: Number of bytes written=1247665
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
GS: Number of bytes read=846
GS: Number of bytes written=76
GS: Number of read operations=0
GS: Number of large read operations=0
GS: Number of write operations=0
HDFS: Number of bytes read=139
HDFS: Number of bytes written=0
HDFS: Number of read operations=1
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
Job Counters
Killed reduce tasks=1
Launched map tasks=1
Launched reduce tasks=5
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=17348
Total time spent by all reduces in occupied slots (ms)=195920
Total time spent by all map tasks (ms)=4337
Total time spent by all reduce tasks (ms)=48980
Total vcore-milliseconds taken by all map tasks=4337
Total vcore-milliseconds taken by all reduce tasks=48980
Total megabyte-milliseconds taken by all map tasks=8882176
Total megabyte-milliseconds taken by all reduce tasks=100311040
Map-Reduce Framework
Map input records=25
Map output records=6
Map output bytes=78
Map output materialized bytes=120
Input split bytes=139
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=120
Reduce input records=6
Reduce output records=6
Spilled Records=12
Shuffled Maps =5
Failed Shuffles=0
Merged Map outputs=5
GC time elapsed (ms)=1409
CPU time spent (ms)=6350
Physical memory (bytes) snapshot=1900220416
Virtual memory (bytes) snapshot=21124952064
Total committed heap usage (bytes)=1492123648
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=846
File Output Format Counters
Bytes Written=76
Job output is complete
更新 3:
我更新了 Reducer(根据 LowKey 所说),它给我的输出与上面相同。它没有做我想要它做的加法。它完全忽略了该操作。为什么?
public static class MaxMinReducer
extends Reducer<Text, Text, Text, IntWritable> {
public IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int value = 0;
int sumResult = 0;
Iterator<IntWritable> iterator = values.iterator();
while (values.iterator().hasNext()) {
value = iterator.next().get();
sumResult = sumResult + value;
}
result.set(sumResult);
context.write(key, result);
}
}
更新 4:添加我的导入和驱动程序 class 以找出为什么我的减速器不会 运行?
package mapreduceprogram;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "tempmin");
job.setJarByClass(TempMin.class);
job.setMapperClass(MaxMinMapper.class);
job.setReducerClass(MaxMinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path (args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
有什么问题吗,为什么我的减速器 class 不是 运行ning?
这些列是否由制表符分隔?
如果是,那么不要指望在那里找到 space 字符。
你做错了什么?嗯,一方面,你为什么有:
final int missing = -9999;
这没有任何意义。
在此之下,您有一些代码显然应该添加两个值,但您似乎不小心从列表中丢弃了项目。查看您拥有的位置:
if (values.iterator().next().get() != missing)
好吧...你从未保存过这个值,所以这意味着你把它扔掉了。
另一个问题是您添加不正确...出于某种原因,您试图为循环的每次迭代添加两个值。你应该添加一个,所以你的循环应该是这样的:
IntWritable value = null;
Iterator iterator = values.iterator();
while (values.iterator().hasNext()) {
value = iterator.next().get();
if (value != missing){
sumResult = sumResult + value;
}
}
下一个明显的问题是您将输出行放在 while 循环中:
while (values.iterator().hasNext()) {
[...]
context.write(key, result);
}
这意味着每次将一个项目读入 reducer 时,都会写出一个项目。我想你要做的是读入给定键的所有项目,然后写一个减少的值(总和)。在那种情况下,您不应该在循环中输出。应该是在.
之后
while ([...]) {
[...]
}
result.set(sumResult);
context.write(key, result);
我正在尝试收集特定站点的最高和最低温度,然后找出不同日期的温度总和,但我一直在映射器中遇到错误,并且尝试了很多其他方法,例如使用 stringtokenizer但同样的事情,我得到一个错误。
示例输入。
站点日期 (YYYYMMDD) 元素温度 flag1 flat2 othervalue
我只需要输入站、日期(键)、元素和温度
USW00003889,20180101,TMAX,122,7,1700
USW00003889,20180101,TMIN,-67,7,1700
UK000056225,20180101,TOBS,56,7,1700
UK000056225,20180101,PRCP,0,7,1700
UK000056225,20180101,SNOW,0,7
USC00264341,20180101,SNWD,0,7,1700
USC00256837,20180101,PRCP,0,7,800
UK000056225,20180101,SNOW,0,7
UK000056225,20180101,SNWD,0,7,800
USW00003889,20180102,TMAX,12,E
USW00003889,20180102,TMIN,3,E
UK000056225,20180101,PRCP,42,E
SWE00138880,20180101,PRCP,50,E
UK000056225,20180101,PRCP,0,a
USC00256480,20180101,PRCP,0,7,700
USC00256480,20180101,SNOW,0,7
USC00256480,20180101,SNWD,0,7,700
SWE00138880,20180103,TMAX,-228,7,800
SWE00138880,20180103,TMIN,-328,7,800
USC00247342,20180101,PRCP,0,7,800
UK000056225,20180101,SNOW,0,7
SWE00137764,20180101,PRCP,63,E
UK000056225,20180101,SNWD,0,E
USW00003889,20180104,TMAX,-43,W
USW00003889,20180104,TMIN,-177,W
public static class MaxMinMapper
extends Mapper<Object, Text, Text, IntWritable> {
private Text newDate = new Text();
public void map(Object key, Text value, Context context) throws
IOException,
InterruptedException {
String stationID = "USW00003889";
String[] tokens = value.toString().split(",");
String station = "";
String date = "";
String element = "";
int data = 0;
station = tokens[0];
date = tokens[1];
element = tokens[2];
data = Integer.parseInt(tokens[3]);
if (stationID.equals(station) && ( element.equals("TMAX") ||
element.equals("TMIN")) ) {
newDate.set(date);
context.write(newDate, new IntWritable(data));
}
}
}
public static class MaxMinReducer
extends Reducer<Text, Text, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sumResult = 0;
int val1 = 0;
int val2 = 0;
while (values.iterator().hasNext()) {
val1 = values.iterator().next().get();
val2 = values.iterator().next().get();
sumResult = val1 + val2;
}
result.set(sumResult);
context.write(key, result);
}
}
}
请帮帮我,谢谢。
更新:使用条件验证每一行并将数据变量更改为字符串(稍后更改回 Integer -> IntWritable)。
if (tokens.length <= 5) {
station = tokens[0];
date = tokens[1];
element = tokens[2];
data = tokens[3];
otherValue = tokens[4];
}else{
station = tokens[0];
date = tokens[1];
element = tokens[2];
data = tokens[3];
otherValue = tokens[4];
otherValue2 = tokens[5];
}
更新 2:好的,我现在正在将输出写入文件,但它是错误的输出。我需要它来添加具有相同日期(键)的两个值我做错了什么?
OUTPUT:
20180101 -67
20180101 122
20180102 3
20180102 12
20180104 -177
20180104 -43
Desired Output
20180101 55
20180102 15
20180104 -220
这也是我收到的错误,即使我得到了输出。
ERROR: (gcloud.dataproc.jobs.submit.hadoop) Job [8e31c44ccd394017a4a28b3b16471aca] failed with error:
Google Cloud Dataproc Agent reports job failure. If logs are available, they can be found at 'https://console.cloud.google.com/dataproc/jobs/8e31c44ccd394017a4a28b3b16471aca
?project=driven-airway-257512®ion=us-central1' and in 'gs://dataproc-261a376e-7874-4151-b6b7-566c18758206-us-central1/google-cloud-dataproc-metainfo/f912a2f0-107f-40b6-94
56-b6a72cc8bfc4/jobs/8e31c44ccd394017a4a28b3b16471aca/driveroutput'.
19/11/14 12:53:24 INFO client.RMProxy: Connecting to ResourceManager at cluster-1e8f-m/10.128.0.12:8032
19/11/14 12:53:25 INFO client.AHSProxy: Connecting to Application History server at cluster-1e8f-m/10.128.0.12:10200
19/11/14 12:53:26 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
19/11/14 12:53:26 INFO input.FileInputFormat: Total input files to process : 1
19/11/14 12:53:26 INFO mapreduce.JobSubmitter: number of splits:1
19/11/14 12:53:26 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
19/11/14 12:53:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1573654432484_0035
19/11/14 12:53:27 INFO impl.YarnClientImpl: Submitted application application_1573654432484_0035
19/11/14 12:53:27 INFO mapreduce.Job: The url to track the job: http://cluster-1e8f-m:8088/proxy/application_1573654432484_0035/
19/11/14 12:53:27 INFO mapreduce.Job: Running job: job_1573654432484_0035
19/11/14 12:53:35 INFO mapreduce.Job: Job job_1573654432484_0035 running in uber mode : false
19/11/14 12:53:35 INFO mapreduce.Job: map 0% reduce 0%
19/11/14 12:53:41 INFO mapreduce.Job: map 100% reduce 0%
19/11/14 12:53:52 INFO mapreduce.Job: map 100% reduce 20%
19/11/14 12:53:53 INFO mapreduce.Job: map 100% reduce 40%
19/11/14 12:53:54 INFO mapreduce.Job: map 100% reduce 60%
19/11/14 12:53:56 INFO mapreduce.Job: map 100% reduce 80%
19/11/14 12:53:57 INFO mapreduce.Job: map 100% reduce 100%
19/11/14 12:53:58 INFO mapreduce.Job: Job job_1573654432484_0035 completed successfully
19/11/14 12:53:58 INFO mapreduce.Job: Counters: 55
File System Counters
FILE: Number of bytes read=120
FILE: Number of bytes written=1247665
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
GS: Number of bytes read=846
GS: Number of bytes written=76
GS: Number of read operations=0
GS: Number of large read operations=0
GS: Number of write operations=0
HDFS: Number of bytes read=139
HDFS: Number of bytes written=0
HDFS: Number of read operations=1
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
Job Counters
Killed reduce tasks=1
Launched map tasks=1
Launched reduce tasks=5
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=17348
Total time spent by all reduces in occupied slots (ms)=195920
Total time spent by all map tasks (ms)=4337
Total time spent by all reduce tasks (ms)=48980
Total vcore-milliseconds taken by all map tasks=4337
Total vcore-milliseconds taken by all reduce tasks=48980
Total megabyte-milliseconds taken by all map tasks=8882176
Total megabyte-milliseconds taken by all reduce tasks=100311040
Map-Reduce Framework
Map input records=25
Map output records=6
Map output bytes=78
Map output materialized bytes=120
Input split bytes=139
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=120
Reduce input records=6
Reduce output records=6
Spilled Records=12
Shuffled Maps =5
Failed Shuffles=0
Merged Map outputs=5
GC time elapsed (ms)=1409
CPU time spent (ms)=6350
Physical memory (bytes) snapshot=1900220416
Virtual memory (bytes) snapshot=21124952064
Total committed heap usage (bytes)=1492123648
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=846
File Output Format Counters
Bytes Written=76
Job output is complete
更新 3:
我更新了 Reducer(根据 LowKey 所说),它给我的输出与上面相同。它没有做我想要它做的加法。它完全忽略了该操作。为什么?
public static class MaxMinReducer
extends Reducer<Text, Text, Text, IntWritable> {
public IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int value = 0;
int sumResult = 0;
Iterator<IntWritable> iterator = values.iterator();
while (values.iterator().hasNext()) {
value = iterator.next().get();
sumResult = sumResult + value;
}
result.set(sumResult);
context.write(key, result);
}
}
更新 4:添加我的导入和驱动程序 class 以找出为什么我的减速器不会 运行?
package mapreduceprogram;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "tempmin");
job.setJarByClass(TempMin.class);
job.setMapperClass(MaxMinMapper.class);
job.setReducerClass(MaxMinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path (args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
有什么问题吗,为什么我的减速器 class 不是 运行ning?
这些列是否由制表符分隔? 如果是,那么不要指望在那里找到 space 字符。
你做错了什么?嗯,一方面,你为什么有:
final int missing = -9999;
这没有任何意义。
在此之下,您有一些代码显然应该添加两个值,但您似乎不小心从列表中丢弃了项目。查看您拥有的位置:
if (values.iterator().next().get() != missing)
好吧...你从未保存过这个值,所以这意味着你把它扔掉了。
另一个问题是您添加不正确...出于某种原因,您试图为循环的每次迭代添加两个值。你应该添加一个,所以你的循环应该是这样的:
IntWritable value = null;
Iterator iterator = values.iterator();
while (values.iterator().hasNext()) {
value = iterator.next().get();
if (value != missing){
sumResult = sumResult + value;
}
}
下一个明显的问题是您将输出行放在 while 循环中:
while (values.iterator().hasNext()) {
[...]
context.write(key, result);
}
这意味着每次将一个项目读入 reducer 时,都会写出一个项目。我想你要做的是读入给定键的所有项目,然后写一个减少的值(总和)。在那种情况下,您不应该在循环中输出。应该是在.
之后while ([...]) {
[...]
}
result.set(sumResult);
context.write(key, result);