Hadoop MapReduce 复制连接

Hadoop MapReduce Replicated Join

我需要使用复制联接实现简单的内部联接,但我发现关于如何执行此操作的信息少得令人震惊。

我有一个小于 1gb 的数据集,看起来像

A C
A D
B Y
C D

然后我的输入文件看起来非常相似

C Z
B I
A B
D Z
C O

我想对我的复制数据集的第一列和数据集的第二列进行内部联接,以便得到类似

的内容
A C Z
A C O
A D Z
C D Z

复制数据集或主数据集都不会有重复行

我知道我需要将 reducer 的数量设置为 0,并且我需要在 MAP 的设置阶段读取复制的数据集

但我不知道如何读取它,也不知道在哪里存储复制的数据集,也不知道读取文件的数据结构,或者如何进行连接。

我看到一些使用哈希映射的教程,但这不起作用,因为重复的键会相互覆盖。

这是我想出的解决方案。 我想要跨所有映射器复制的文件已加载到我的驱动程序代码

中的分布式缓存中
DistributedCache.addCacheFile(new Path(args[3]).toUri(), job.getConfiguration());

在我的映射器中,我声明了一个

类型的哈希映射
Map<String, List<String>> adjacencyList = new HashMap<>();

在我的映射器的设置阶段,我从分布式缓存中读取了所有文件

Path[] pathLength1 = DistributedCache.getLocalCacheFiles(context.getConfiguration());
            if(pathLength1 != null && pathLength1.length > 0) {
                for(Path file : pathLength1) {
                    readFile(file);
                }
            }

读取文件的代码如下,我可以通过将相同键的所有值附加到它们自身来绕过键相互覆盖

BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath.toString()));
        String stopWord = null;
        while((stopWord = bufferedReader.readLine()) != null) {
            String[] splited = stopWord.split("\s+");
            List<String> temp = new ArrayList<String>();
            if(adjacencyList.containsKey(splited[0])){
                temp = adjacencyList.get(splited[0]);
            }
            temp.add(splited[1]);
            adjacencyList.put(splited[0], temp);
        }

然后在数学阶段,当我逐行读取我的其他文件时,我可以检查我的哈希映射是否存在该键并加入它。

不需要减速器