MapReduce 作业不 运行 完整数据
MapReduce job doesn't run on complete data
我有包含 35 列(0-34 个位置)的数据集 (input.csv)。如果我 运行 我的 MRv2 程序,那么我得到 "ArrayIndexOutOfBoundException".
但是,如果我 运行 包含相同列的数据集快照上的程序,那么它会 运行 成功。
错误
15/07/20 11:05:55 INFO mapreduce.Job: Task Id : attempt_1437379028043_0018_m_000000_2, Status : FAILED
Error: java.lang.ArrayIndexOutOfBoundsException: 34
at lotus.staging.StageMapper.map(StageMapper.java:88)
at lotus.staging.StageMapper.map(StageMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
StageMapper
package lotus.staging;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class StageMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] record = value.toString().split(",");
// Key
String stg_table = null;
String report_code = record[0].trim();
String product_type_description = null;
String principal_amount = record[1];
String funded = record[2].trim();
String facility_id = record[3];
String loan_id = record[4];
// Start Date
String start_date = record[5];
// Maturity Date
String end_date = record[6];
DateFormat df = new SimpleDateFormat("MM/dd/yyyy");
Date startDate;
Date endDate;
long diff;
long diffDays = 0l;
try {
startDate = df.parse(start_date);
endDate = df.parse(end_date);
df.format(startDate);
df.format(endDate);
diff = endDate.getTime() - startDate.getTime();
diffDays = diff / (24 * 60 * 60 * 1000);
} catch (ParseException e) {
e.printStackTrace();
}
// Date Diff
String date_diff = String.valueOf(diffDays);
String next_reset_date = record[7];
String interest_rate = record[8];
String base_interest_rate = record[9];
String counterparty_industry_id = record[10];
String industry_name = record[11];
String counterparty_id = record[12];
String counterparty_name = record[13];
// Bank Number
String vehicle_code = record[14];
String vehicle_description = record[15];
// Branch Number
String cost_center_code = record[16];
String branch_borrower_name = record[17];
String igl_code = record[20];
// Participation Bal Begin Month
String participated_amt = record[21];
String sys_id = record[23];
// Loan To Value
String ltv = record[26];
String accrual_status = record[27];
String country_code = record[30];
String fiscal_year = record[31];
String accounting_period = record[32];
String accounting_day = record[33];
String control_category = record[34];
// CONTROL_CATEGORY_DESC, Secred_BY_Re
if (report_code.equalsIgnoreCase("1")) {
product_type_description = "Commercial_Loan";
stg_table = "stg_lon";
} else if (report_code.equalsIgnoreCase("2")) {
product_type_description = "Mortgage_Loan";
stg_table = "stg_mgt";
} else if (report_code.equalsIgnoreCase("3")) {
product_type_description = "Installment_Loan";
stg_table = "stg_lon";
} else if (report_code.equalsIgnoreCase("4")) {
product_type_description = "Revolving Credit";
stg_table = "stg_lon";
}
// Value
String data = report_code + "," + product_type_description + ","
+ principal_amount + "," + funded + "," + facility_id + ","
+ loan_id + "," + start_date + "," + end_date + "," + date_diff
+ "," + next_reset_date + "," + interest_rate + ","
+ base_interest_rate + "," + counterparty_industry_id + ","
+ industry_name + "," + counterparty_id + ","
+ counterparty_name + "," + vehicle_code + ","
+ vehicle_description + "," + cost_center_code + ","
+ branch_borrower_name + "," + igl_code + ","
+ participated_amt + "," + sys_id + "," + ltv + ","
+ accrual_status + "," + country_code + "," + fiscal_year + ","
+ accounting_period + "," + accounting_day + ","
+ control_category;
context.write(new Text(stg_table), new Text(data));
} // map() ends
} // Mapper ends
StageReducer
package lotus.staging;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class StageReducer extends Reducer<Text, Text, Text, Text> {
private MultipleOutputs mos;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs(context);
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
mos.write(key, value, key.toString());
}
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
mos.close();
}
}
StageDriver
package lotus.staging;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class StageDriver {
// Main
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "StageDriver");
// conf.set("mapreduce.textoutputformat.separator", ",");
// conf.set("mapreduce.output.textoutputformat.separator", ",");
//conf.set("mapreduce.output.key.field.separator", ",");
job.setJarByClass(StageDriver.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
// Mapper and Mapper-Output Key
job.setMapperClass(StageMapper.class);
job.setMapOutputKeyClass(Text.class);
conf.set("mapred.max.split.size", "1020");
// Reducer and Output Key and Value
job.setReducerClass(StageReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// Input parameters to execute
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// deleting the output path automatically from hdfs so that we don't
// have delete it explicitly
// outputPath.getFileSystem(conf).delete(outputPath);
// exiting the job only if the flag value becomes false
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
以下是数据集
Snapshot-Dataset Complete-Dataset
请协助
input.csv 中的某一行不完整或存在格式错误(转义不当)。试着弄清楚它是哪一行。您可以捕获发生此错误的异常并打印出行号并修复您的数据。
try {
CODE WHERE THE OUTOFBOUNDS HAPPENS
}
catch (Exception e) {
LOG.warn(String.format("Invalid data in row: %d", row));
System.out.println(String.format("Invalid data in row: %d", row));
}
所以在你的情况下,这意味着:
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] record = value.toString().split(",");
// Key
String stg_table = null;
try{
String report_code = record[0].trim();
String product_type_description = null;
String principal_amount = record[1];
String funded = record[2].trim();
String facility_id = record[3];
String loan_id = record[4];
// Start Date
String start_date = record[5];
// Maturity Date
String end_date = record[6];
DateFormat df = new SimpleDateFormat("MM/dd/yyyy");
Date startDate;
Date endDate;
long diff;
long diffDays = 0l;
try {
startDate = df.parse(start_date);
endDate = df.parse(end_date);
df.format(startDate);
df.format(endDate);
diff = endDate.getTime() - startDate.getTime();
diffDays = diff / (24 * 60 * 60 * 1000);
} catch (ParseException e) {
e.printStackTrace();
}
// Date Diff
String date_diff = String.valueOf(diffDays);
String next_reset_date = record[7];
String interest_rate = record[8];
String base_interest_rate = record[9];
String counterparty_industry_id = record[10];
String industry_name = record[11];
String counterparty_id = record[12];
String counterparty_name = record[13];
// Bank Number
String vehicle_code = record[14];
String vehicle_description = record[15];
// Branch Number
String cost_center_code = record[16];
String branch_borrower_name = record[17];
String igl_code = record[20];
// Participation Bal Begin Month
String participated_amt = record[21];
String sys_id = record[23];
// Loan To Value
String ltv = record[26];
String accrual_status = record[27];
String country_code = record[30];
String fiscal_year = record[31];
String accounting_period = record[32];
String accounting_day = record[33];
String control_category = record[34];
}
catch (Exception e) {
if {record.size() > 0} {
// LOG.warn(String.format("Invalid data in row: %s", record[0].trim()));
System.out.println(String.format("Invalid data in record id: %s", record[0].trim()));}
else{
System.out.println("Empty Record Found");
}
return void;
}
...
我正在使用记录 ID,因为您没有行号,但是您可以在其中搜索该记录 ID。并且大概在您的记录中至少有第一个条目。否则你也可以检查记录是否为空。
我有包含 35 列(0-34 个位置)的数据集 (input.csv)。如果我 运行 我的 MRv2 程序,那么我得到 "ArrayIndexOutOfBoundException".
但是,如果我 运行 包含相同列的数据集快照上的程序,那么它会 运行 成功。
错误
15/07/20 11:05:55 INFO mapreduce.Job: Task Id : attempt_1437379028043_0018_m_000000_2, Status : FAILED
Error: java.lang.ArrayIndexOutOfBoundsException: 34
at lotus.staging.StageMapper.map(StageMapper.java:88)
at lotus.staging.StageMapper.map(StageMapper.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
StageMapper
package lotus.staging;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class StageMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] record = value.toString().split(",");
// Key
String stg_table = null;
String report_code = record[0].trim();
String product_type_description = null;
String principal_amount = record[1];
String funded = record[2].trim();
String facility_id = record[3];
String loan_id = record[4];
// Start Date
String start_date = record[5];
// Maturity Date
String end_date = record[6];
DateFormat df = new SimpleDateFormat("MM/dd/yyyy");
Date startDate;
Date endDate;
long diff;
long diffDays = 0l;
try {
startDate = df.parse(start_date);
endDate = df.parse(end_date);
df.format(startDate);
df.format(endDate);
diff = endDate.getTime() - startDate.getTime();
diffDays = diff / (24 * 60 * 60 * 1000);
} catch (ParseException e) {
e.printStackTrace();
}
// Date Diff
String date_diff = String.valueOf(diffDays);
String next_reset_date = record[7];
String interest_rate = record[8];
String base_interest_rate = record[9];
String counterparty_industry_id = record[10];
String industry_name = record[11];
String counterparty_id = record[12];
String counterparty_name = record[13];
// Bank Number
String vehicle_code = record[14];
String vehicle_description = record[15];
// Branch Number
String cost_center_code = record[16];
String branch_borrower_name = record[17];
String igl_code = record[20];
// Participation Bal Begin Month
String participated_amt = record[21];
String sys_id = record[23];
// Loan To Value
String ltv = record[26];
String accrual_status = record[27];
String country_code = record[30];
String fiscal_year = record[31];
String accounting_period = record[32];
String accounting_day = record[33];
String control_category = record[34];
// CONTROL_CATEGORY_DESC, Secred_BY_Re
if (report_code.equalsIgnoreCase("1")) {
product_type_description = "Commercial_Loan";
stg_table = "stg_lon";
} else if (report_code.equalsIgnoreCase("2")) {
product_type_description = "Mortgage_Loan";
stg_table = "stg_mgt";
} else if (report_code.equalsIgnoreCase("3")) {
product_type_description = "Installment_Loan";
stg_table = "stg_lon";
} else if (report_code.equalsIgnoreCase("4")) {
product_type_description = "Revolving Credit";
stg_table = "stg_lon";
}
// Value
String data = report_code + "," + product_type_description + ","
+ principal_amount + "," + funded + "," + facility_id + ","
+ loan_id + "," + start_date + "," + end_date + "," + date_diff
+ "," + next_reset_date + "," + interest_rate + ","
+ base_interest_rate + "," + counterparty_industry_id + ","
+ industry_name + "," + counterparty_id + ","
+ counterparty_name + "," + vehicle_code + ","
+ vehicle_description + "," + cost_center_code + ","
+ branch_borrower_name + "," + igl_code + ","
+ participated_amt + "," + sys_id + "," + ltv + ","
+ accrual_status + "," + country_code + "," + fiscal_year + ","
+ accounting_period + "," + accounting_day + ","
+ control_category;
context.write(new Text(stg_table), new Text(data));
} // map() ends
} // Mapper ends
StageReducer
package lotus.staging;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class StageReducer extends Reducer<Text, Text, Text, Text> {
private MultipleOutputs mos;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
mos = new MultipleOutputs(context);
}
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
mos.write(key, value, key.toString());
}
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
mos.close();
}
}
StageDriver
package lotus.staging;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class StageDriver {
// Main
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "StageDriver");
// conf.set("mapreduce.textoutputformat.separator", ",");
// conf.set("mapreduce.output.textoutputformat.separator", ",");
//conf.set("mapreduce.output.key.field.separator", ",");
job.setJarByClass(StageDriver.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
// Mapper and Mapper-Output Key
job.setMapperClass(StageMapper.class);
job.setMapOutputKeyClass(Text.class);
conf.set("mapred.max.split.size", "1020");
// Reducer and Output Key and Value
job.setReducerClass(StageReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// Input parameters to execute
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// deleting the output path automatically from hdfs so that we don't
// have delete it explicitly
// outputPath.getFileSystem(conf).delete(outputPath);
// exiting the job only if the flag value becomes false
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
以下是数据集
Snapshot-Dataset Complete-Dataset
请协助
input.csv 中的某一行不完整或存在格式错误(转义不当)。试着弄清楚它是哪一行。您可以捕获发生此错误的异常并打印出行号并修复您的数据。
try {
CODE WHERE THE OUTOFBOUNDS HAPPENS
}
catch (Exception e) {
LOG.warn(String.format("Invalid data in row: %d", row));
System.out.println(String.format("Invalid data in row: %d", row));
}
所以在你的情况下,这意味着:
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] record = value.toString().split(",");
// Key
String stg_table = null;
try{
String report_code = record[0].trim();
String product_type_description = null;
String principal_amount = record[1];
String funded = record[2].trim();
String facility_id = record[3];
String loan_id = record[4];
// Start Date
String start_date = record[5];
// Maturity Date
String end_date = record[6];
DateFormat df = new SimpleDateFormat("MM/dd/yyyy");
Date startDate;
Date endDate;
long diff;
long diffDays = 0l;
try {
startDate = df.parse(start_date);
endDate = df.parse(end_date);
df.format(startDate);
df.format(endDate);
diff = endDate.getTime() - startDate.getTime();
diffDays = diff / (24 * 60 * 60 * 1000);
} catch (ParseException e) {
e.printStackTrace();
}
// Date Diff
String date_diff = String.valueOf(diffDays);
String next_reset_date = record[7];
String interest_rate = record[8];
String base_interest_rate = record[9];
String counterparty_industry_id = record[10];
String industry_name = record[11];
String counterparty_id = record[12];
String counterparty_name = record[13];
// Bank Number
String vehicle_code = record[14];
String vehicle_description = record[15];
// Branch Number
String cost_center_code = record[16];
String branch_borrower_name = record[17];
String igl_code = record[20];
// Participation Bal Begin Month
String participated_amt = record[21];
String sys_id = record[23];
// Loan To Value
String ltv = record[26];
String accrual_status = record[27];
String country_code = record[30];
String fiscal_year = record[31];
String accounting_period = record[32];
String accounting_day = record[33];
String control_category = record[34];
}
catch (Exception e) {
if {record.size() > 0} {
// LOG.warn(String.format("Invalid data in row: %s", record[0].trim()));
System.out.println(String.format("Invalid data in record id: %s", record[0].trim()));}
else{
System.out.println("Empty Record Found");
}
return void;
}
...
我正在使用记录 ID,因为您没有行号,但是您可以在其中搜索该记录 ID。并且大概在您的记录中至少有第一个条目。否则你也可以检查记录是否为空。