Hadoop Map Reduce:如何为此创建一个 reduce 函数?

Hadoop Map Reduce: How to create a reduce function for this?

我碰壁了。我有以下从以前的 MR 函数生成的文件。

产品评分(我有)

0528881469  1.62
0594451647  2.28
0594481813  2.67
0972683275  4.37
1400501466  3.62  

第 1 列 = product_id,第 2 列 = product_rating

相关产品(我有)

0000013714  [0005080789,0005476798,0005476216,0005064341]
0000031852  [B00JHONN1S,B002BZX8Z6,B00D2K1M3O,0000031909]
0000031887  [0000031852,0000031895,0000031909,B00D2K1M3O]
0000031895  [B002BZX8Z6,B00JHONN1S,0000031909,B008F0SU0Y]
0000031909  [B002BZX8Z6,B00JHONN1S,0000031895,B00D2K1M3O]

其中第 1 列 = product_id,第 2 列 = also_bought 产品数组

我现在尝试创建的文件将这两个文件组合成以下文件:

推荐产品(我需要)

0000013714  [<0005080789, 2.34>,<0005476798, 4.58>,<0005476216, 2.32>]
0000031852  [<0005476798, 4.58>,<0005080789, 2.34>,<0005476216, 2.32>]
0000031887  [<0005080789, 2.34>,<0005476798, 4.58>,<0005476216, 2.32>]
0000031895  [<0005476216, 2.32>,<0005476798, 4.58>,<0005080789, 2.34>]
0000031909  [<0005476216, 2.32>,<0005080789, 2.34>,<0005476798, 4.58>]

其中第 1 列 = product_id 和第 2 列 =

的元组数组

我现在完全卡住了,我以为我对此有一个计划,但事实证明这不是一个很好的计划而且没有奏效。

基于您的产品得分数据大小的两种方法:

  1. 如果您的 Product Scores 文件不是很大,您可以将其加载到 Hadoop 分布式缓存中。(现在在 Jobs 本身中可用)Job.addCacheFile() 然后,处理 Related Products 文件并在 Reducer 中获取必要的评级并将其写出。又快又脏。但是,如果 Product Scores 是一个巨大的文件,那么可能不是解决这个问题的正确方法。

  2. 减少边连接。各种可用的示例,例如,请参考此 link 以获得想法。

  3. 由于您已经定义了架构,因此可以在其之上创建配置单元表并使用查询获取输出。这会为您节省很多时间。

    编辑:此外,如果您已经有 map-reduce 作业来创建此文件,您可以添加配置单元作业,它在这些减速器输出上创建外部配置单元表,然后查询它们。

我最终使用了 MapFile。我将 ProductScoresRelatedProducts 数据集转换为两个 MapFile,然后制作了一个 Java 程序,在需要时从这些 MapFile 中提取信息。

MapFileWriter

public class MapFileWriter {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        Path inputFile = new Path(args[0]);
        Path outputFile = new Path(args[1]);
        Text txtKey = new Text();
        Text txtValue = new Text();
        try {
            FileSystem fs = FileSystem.get(conf);
            FSDataInputStream inputStream = fs.open(inputFile);
            Writer writer = new Writer(conf, fs, outputFile.toString(), txtKey.getClass(), txtKey.getClass());
            writer.setIndexInterval(1);
            while (inputStream.available() > 0) {
                String strLineInInputFile = inputStream.readLine();
                String[] lstKeyValuePair = strLineInInputFile.split("\t");
                txtKey.set(lstKeyValuePair[0]);
                txtValue.set(lstKeyValuePair[1]);
                writer.append(txtKey, txtValue);
            }
            writer.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

MapFileReader

public class MapFileReader {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        FileSystem fs;
        Text txtKey = new Text(args[1]);
        Text txtValue = new Text();
        MapFile.Reader reader;
        try {
            fs = FileSystem.get(conf);
            try {
                reader = new MapFile.Reader(fs, args[0], conf);
                reader.get(txtKey, txtValue);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("The value for Key " + txtKey.toString() + " is " + txtValue.toString());
    }
}