如何在 Python 中编写 MapReduce 代码来实现矩阵转置。

How can I write a MapReduce code in Python to implement a matrix transpose.

假设输入文件是一个 .txt,我正在尝试 运行 它在集群上(如 AWS 上的 EMR)进行测试。

你想要的问题是你要求保持行的顺序,意思是

如果你有:
a,b,c
d,e,f
g,h,i

您希望输出为
a,d,g
b,e,h
c,f,i

但是 MapReduce 不是这样工作的。 MR 将获取文件并将文件拆分到不同的 map reduce 任务, 我本可以给你一个代码,它会产生类似转置的东西, 意义

可以是:
a,g,d
g,e,h
我,c,f

因为线路会被发送到同一个地图,但是线路的顺序没有保持。

除非您可以对文件进行一些预处理,并将行号作为参数添加到每一行。

如果你能做到,代码就很简单了:

假设这是你的文件(左边的数字是行索引)

  1. a,b,c,d
  2. e,f,g,h
  3. i,j,k,l

然后,当它在 emr 集群中流动时,emr 将文件分成几行。 假设行索引首先是这样的:
1,a,b,c
2,d,e,f
3,g,h,i

def map():
    for line in sys.stdin:
        splitted_line = line.strip().split()
        line_index, values = splitted_line[0], splitted_line[1:]

        for column_index, value in enumerate(values):
            # emits a record were the key in the column index, 
            # and the value is the line number and the value.
            # the '\t' is the delimiter the emr use by default to 
            # identify the separator between the key and value
            # the line looks like this:
            # "<column_index>|\t|<line_index>|<value>"
            print '|'.join(column_index,'\t',line_index,value)

def reduce():
    old_column_index = None
    line_to_emit = list()
    for line in sys.stdin:
        values = line.split("|")
        column_index, line_index, value = values[0], values[2], values[3]

        if not old_column_index:
            old_column_index = column_index
        if column_index == old_column_index:
            line_to_emit.append((line_index,value))
        else:
            old_column_index = column_index
            sorted_line = sorted(line_to_emit, key=lambda value:value[0])
            only_values_line = [value[1] for value in sorted_line]
            print ','.join(only_values_line)

但即便如此,输出中的行仍然不会按照您需要的顺序排列。你必须自己对它们进行排序,可能的方式是,你自己传递新行的索引。