如何在 mapreduce 中使用多个 CSV 文件
How to use multiple CSV files in mapreduce
首先,我将解释我正在尝试做什么。首先,我将输入文件(第一个 CSV 文件)放入 mapreduce 作业,其他 CSV 文件将放入映射器 class。但事情是这样的。 mapper class 中的代码无法正常工作,例如右下角的代码。我想合并两个 CSV 文件以在每个 CSV 文件中使用多个列。
比如1个文件有BibNum(用户账号),checkoutdatetime(图书checkoutdatetime),itemtype(图书itemtype),2个CSV文件有BibNum(用户账号),Title(书名),Itemtype等。我想知道下个月可能会借到哪本书。如果您知道可以组合两个 CSV 文件的方法并在任何帮助下启发我,我将不胜感激。如果您对我的代码有任何疑问,请告诉我,我会尽力澄清。
Path p = new Path("hdfs://0.0.0.0:8020/user/training/Inventory_Sample");
FileSystem fs = FileSystem.get(conf);
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(p)));
try {
String BibNum = "Test";
//System.out.print("test");
while(br.readLine() != null){
//System.out.print("test");
if(!br.readLine().startsWith("BibNumber")) {
String subject[] = br.readLine().split(",");
BibNum = subject[0];
}
}
.
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Date;
import java.util.HashMap;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class StubMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outkey = new Text();
//private MinMaxCountTuple outTuple = new MinMaxCountTuple();
//String csvFile = "hdfs://user/training/Inventory_Sample";
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
//conf.addResource("/etc/hadoop/conf/core-site.xml");
//conf.addResource("/etc/hadoop/conf/hdfs-site.xml");
Path p = new Path("hdfs://0.0.0.0:8020/user/training/Inventory_Sample");
FileSystem fs = FileSystem.get(conf);
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(p)));
try {
String BibNum = "Test";
//System.out.print("test");
while(br.readLine() != null){
//System.out.print("test");
if(!br.readLine().startsWith("BibNumber")) {
String subject[] = br.readLine().split(",");
BibNum = subject[0];
}
}
if(value.toString().startsWith("BibNumber"))
{
return;
}
String data[] = value.toString().split(",");
String BookType = data[2];
String DateTime = data[5];
SimpleDateFormat frmt = new SimpleDateFormat("MM/dd/yyyy hh:mm:ss a");
Date creationDate = frmt.parse(DateTime);
frmt.applyPattern("dd-MM-yyyy");
String dateTime = frmt.format(creationDate);
//outkey.set(BookType + " " + dateTime);
outkey.set(BibNum + " " + BookType + " " + dateTime);
//outUserId.set(userId);
context.write(outkey, new IntWritable(1));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
finally{
br.close();
}
}
}
您正在映射器代码中读取 CSV 文件。
如果您使用路径在映射器中打开文件,我猜您使用的是分布式缓存,那么只有文件会随 jar 一起发送到 map reduce 应该 运行 的每个节点。
有一种合并的方法,但在映射器中没有。
您可以尝试以下方法:-
1) 为两个不同的文件编写 2 个单独的映射器。
2) 仅将 mapper 所需的那些字段发送到 reducer。
3) 在 reducer 中合并结果(因为你想加入某个特定的键)。
您可以查看多输入格式示例了解更多。
首先,我将解释我正在尝试做什么。首先,我将输入文件(第一个 CSV 文件)放入 mapreduce 作业,其他 CSV 文件将放入映射器 class。但事情是这样的。 mapper class 中的代码无法正常工作,例如右下角的代码。我想合并两个 CSV 文件以在每个 CSV 文件中使用多个列。 比如1个文件有BibNum(用户账号),checkoutdatetime(图书checkoutdatetime),itemtype(图书itemtype),2个CSV文件有BibNum(用户账号),Title(书名),Itemtype等。我想知道下个月可能会借到哪本书。如果您知道可以组合两个 CSV 文件的方法并在任何帮助下启发我,我将不胜感激。如果您对我的代码有任何疑问,请告诉我,我会尽力澄清。
Path p = new Path("hdfs://0.0.0.0:8020/user/training/Inventory_Sample");
FileSystem fs = FileSystem.get(conf);
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(p)));
try {
String BibNum = "Test";
//System.out.print("test");
while(br.readLine() != null){
//System.out.print("test");
if(!br.readLine().startsWith("BibNumber")) {
String subject[] = br.readLine().split(",");
BibNum = subject[0];
}
}
.
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Date;
import java.util.HashMap;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class StubMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outkey = new Text();
//private MinMaxCountTuple outTuple = new MinMaxCountTuple();
//String csvFile = "hdfs://user/training/Inventory_Sample";
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
//conf.addResource("/etc/hadoop/conf/core-site.xml");
//conf.addResource("/etc/hadoop/conf/hdfs-site.xml");
Path p = new Path("hdfs://0.0.0.0:8020/user/training/Inventory_Sample");
FileSystem fs = FileSystem.get(conf);
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(p)));
try {
String BibNum = "Test";
//System.out.print("test");
while(br.readLine() != null){
//System.out.print("test");
if(!br.readLine().startsWith("BibNumber")) {
String subject[] = br.readLine().split(",");
BibNum = subject[0];
}
}
if(value.toString().startsWith("BibNumber"))
{
return;
}
String data[] = value.toString().split(",");
String BookType = data[2];
String DateTime = data[5];
SimpleDateFormat frmt = new SimpleDateFormat("MM/dd/yyyy hh:mm:ss a");
Date creationDate = frmt.parse(DateTime);
frmt.applyPattern("dd-MM-yyyy");
String dateTime = frmt.format(creationDate);
//outkey.set(BookType + " " + dateTime);
outkey.set(BibNum + " " + BookType + " " + dateTime);
//outUserId.set(userId);
context.write(outkey, new IntWritable(1));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
finally{
br.close();
}
}
}
您正在映射器代码中读取 CSV 文件。
如果您使用路径在映射器中打开文件,我猜您使用的是分布式缓存,那么只有文件会随 jar 一起发送到 map reduce 应该 运行 的每个节点。
有一种合并的方法,但在映射器中没有。
您可以尝试以下方法:-
1) 为两个不同的文件编写 2 个单独的映射器。
2) 仅将 mapper 所需的那些字段发送到 reducer。
3) 在 reducer 中合并结果(因为你想加入某个特定的键)。 您可以查看多输入格式示例了解更多。