Hadoop Map Reduce:如何为此创建一个 reduce 函数?
Hadoop Map Reduce: How to create a reduce function for this?
我碰壁了。我有以下从以前的 MR 函数生成的文件。
产品评分(我有)
0528881469 1.62
0594451647 2.28
0594481813 2.67
0972683275 4.37
1400501466 3.62
第 1 列 = product_id,第 2 列 = product_rating
相关产品(我有)
0000013714 [0005080789,0005476798,0005476216,0005064341]
0000031852 [B00JHONN1S,B002BZX8Z6,B00D2K1M3O,0000031909]
0000031887 [0000031852,0000031895,0000031909,B00D2K1M3O]
0000031895 [B002BZX8Z6,B00JHONN1S,0000031909,B008F0SU0Y]
0000031909 [B002BZX8Z6,B00JHONN1S,0000031895,B00D2K1M3O]
其中第 1 列 = product_id,第 2 列 = also_bought 产品数组
我现在尝试创建的文件将这两个文件组合成以下文件:
推荐产品(我需要)
0000013714 [<0005080789, 2.34>,<0005476798, 4.58>,<0005476216, 2.32>]
0000031852 [<0005476798, 4.58>,<0005080789, 2.34>,<0005476216, 2.32>]
0000031887 [<0005080789, 2.34>,<0005476798, 4.58>,<0005476216, 2.32>]
0000031895 [<0005476216, 2.32>,<0005476798, 4.58>,<0005080789, 2.34>]
0000031909 [<0005476216, 2.32>,<0005080789, 2.34>,<0005476798, 4.58>]
其中第 1 列 = product_id 和第 2 列 =
的元组数组
我现在完全卡住了,我以为我对此有一个计划,但事实证明这不是一个很好的计划而且没有奏效。
基于您的产品得分数据大小的两种方法:
如果您的 Product Scores 文件不是很大,您可以将其加载到 Hadoop 分布式缓存中。(现在在 Jobs 本身中可用)Job.addCacheFile()
然后,处理 Related Products 文件并在 Reducer 中获取必要的评级并将其写出。又快又脏。但是,如果 Product Scores 是一个巨大的文件,那么可能不是解决这个问题的正确方法。
减少边连接。各种可用的示例,例如,请参考此 link 以获得想法。
由于您已经定义了架构,因此可以在其之上创建配置单元表并使用查询获取输出。这会为您节省很多时间。
编辑:此外,如果您已经有 map-reduce 作业来创建此文件,您可以添加配置单元作业,它在这些减速器输出上创建外部配置单元表,然后查询它们。
我最终使用了 MapFile。我将 ProductScores
和 RelatedProducts
数据集转换为两个 MapFile,然后制作了一个 Java 程序,在需要时从这些 MapFile 中提取信息。
MapFileWriter
public class MapFileWriter {
public static void main(String[] args) {
Configuration conf = new Configuration();
Path inputFile = new Path(args[0]);
Path outputFile = new Path(args[1]);
Text txtKey = new Text();
Text txtValue = new Text();
try {
FileSystem fs = FileSystem.get(conf);
FSDataInputStream inputStream = fs.open(inputFile);
Writer writer = new Writer(conf, fs, outputFile.toString(), txtKey.getClass(), txtKey.getClass());
writer.setIndexInterval(1);
while (inputStream.available() > 0) {
String strLineInInputFile = inputStream.readLine();
String[] lstKeyValuePair = strLineInInputFile.split("\t");
txtKey.set(lstKeyValuePair[0]);
txtValue.set(lstKeyValuePair[1]);
writer.append(txtKey, txtValue);
}
writer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
MapFileReader
public class MapFileReader {
public static void main(String[] args) {
Configuration conf = new Configuration();
FileSystem fs;
Text txtKey = new Text(args[1]);
Text txtValue = new Text();
MapFile.Reader reader;
try {
fs = FileSystem.get(conf);
try {
reader = new MapFile.Reader(fs, args[0], conf);
reader.get(txtKey, txtValue);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("The value for Key " + txtKey.toString() + " is " + txtValue.toString());
}
}
我碰壁了。我有以下从以前的 MR 函数生成的文件。
产品评分(我有)
0528881469 1.62
0594451647 2.28
0594481813 2.67
0972683275 4.37
1400501466 3.62
第 1 列 = product_id,第 2 列 = product_rating
相关产品(我有)
0000013714 [0005080789,0005476798,0005476216,0005064341]
0000031852 [B00JHONN1S,B002BZX8Z6,B00D2K1M3O,0000031909]
0000031887 [0000031852,0000031895,0000031909,B00D2K1M3O]
0000031895 [B002BZX8Z6,B00JHONN1S,0000031909,B008F0SU0Y]
0000031909 [B002BZX8Z6,B00JHONN1S,0000031895,B00D2K1M3O]
其中第 1 列 = product_id,第 2 列 = also_bought 产品数组
我现在尝试创建的文件将这两个文件组合成以下文件:
推荐产品(我需要)
0000013714 [<0005080789, 2.34>,<0005476798, 4.58>,<0005476216, 2.32>]
0000031852 [<0005476798, 4.58>,<0005080789, 2.34>,<0005476216, 2.32>]
0000031887 [<0005080789, 2.34>,<0005476798, 4.58>,<0005476216, 2.32>]
0000031895 [<0005476216, 2.32>,<0005476798, 4.58>,<0005080789, 2.34>]
0000031909 [<0005476216, 2.32>,<0005080789, 2.34>,<0005476798, 4.58>]
其中第 1 列 = product_id 和第 2 列 =
的元组数组我现在完全卡住了,我以为我对此有一个计划,但事实证明这不是一个很好的计划而且没有奏效。
基于您的产品得分数据大小的两种方法:
如果您的 Product Scores 文件不是很大,您可以将其加载到 Hadoop 分布式缓存中。(现在在 Jobs 本身中可用)
Job.addCacheFile()
然后,处理 Related Products 文件并在 Reducer 中获取必要的评级并将其写出。又快又脏。但是,如果 Product Scores 是一个巨大的文件,那么可能不是解决这个问题的正确方法。减少边连接。各种可用的示例,例如,请参考此 link 以获得想法。
由于您已经定义了架构,因此可以在其之上创建配置单元表并使用查询获取输出。这会为您节省很多时间。
编辑:此外,如果您已经有 map-reduce 作业来创建此文件,您可以添加配置单元作业,它在这些减速器输出上创建外部配置单元表,然后查询它们。
我最终使用了 MapFile。我将 ProductScores
和 RelatedProducts
数据集转换为两个 MapFile,然后制作了一个 Java 程序,在需要时从这些 MapFile 中提取信息。
MapFileWriter
public class MapFileWriter {
public static void main(String[] args) {
Configuration conf = new Configuration();
Path inputFile = new Path(args[0]);
Path outputFile = new Path(args[1]);
Text txtKey = new Text();
Text txtValue = new Text();
try {
FileSystem fs = FileSystem.get(conf);
FSDataInputStream inputStream = fs.open(inputFile);
Writer writer = new Writer(conf, fs, outputFile.toString(), txtKey.getClass(), txtKey.getClass());
writer.setIndexInterval(1);
while (inputStream.available() > 0) {
String strLineInInputFile = inputStream.readLine();
String[] lstKeyValuePair = strLineInInputFile.split("\t");
txtKey.set(lstKeyValuePair[0]);
txtValue.set(lstKeyValuePair[1]);
writer.append(txtKey, txtValue);
}
writer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
MapFileReader
public class MapFileReader {
public static void main(String[] args) {
Configuration conf = new Configuration();
FileSystem fs;
Text txtKey = new Text(args[1]);
Text txtValue = new Text();
MapFile.Reader reader;
try {
fs = FileSystem.get(conf);
try {
reader = new MapFile.Reader(fs, args[0], conf);
reader.get(txtKey, txtValue);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("The value for Key " + txtKey.toString() + " is " + txtValue.toString());
}
}