如何在 Python 中编写 MapReduce 代码来实现矩阵转置。
How can I write a MapReduce code in Python to implement a matrix transpose.
假设输入文件是一个 .txt,我正在尝试 运行 它在集群上(如 AWS 上的 EMR)进行测试。
你想要的问题是你要求保持行的顺序,意思是
如果你有:
a,b,c
d,e,f
g,h,i
您希望输出为
a,d,g
b,e,h
c,f,i
但是 MapReduce 不是这样工作的。
MR 将获取文件并将文件拆分到不同的 map reduce 任务,
我本可以给你一个代码,它会产生类似转置的东西,
意义
可以是:
a,g,d
g,e,h
我,c,f
因为线路会被发送到同一个地图,但是线路的顺序没有保持。
除非您可以对文件进行一些预处理,并将行号作为参数添加到每一行。
如果你能做到,代码就很简单了:
假设这是你的文件(左边的数字是行索引)
- a,b,c,d
- e,f,g,h
- i,j,k,l
然后,当它在 emr 集群中流动时,emr 将文件分成几行。
假设行索引首先是这样的:
1,a,b,c
2,d,e,f
3,g,h,i
def map():
for line in sys.stdin:
splitted_line = line.strip().split()
line_index, values = splitted_line[0], splitted_line[1:]
for column_index, value in enumerate(values):
# emits a record were the key in the column index,
# and the value is the line number and the value.
# the '\t' is the delimiter the emr use by default to
# identify the separator between the key and value
# the line looks like this:
# "<column_index>|\t|<line_index>|<value>"
print '|'.join(column_index,'\t',line_index,value)
def reduce():
old_column_index = None
line_to_emit = list()
for line in sys.stdin:
values = line.split("|")
column_index, line_index, value = values[0], values[2], values[3]
if not old_column_index:
old_column_index = column_index
if column_index == old_column_index:
line_to_emit.append((line_index,value))
else:
old_column_index = column_index
sorted_line = sorted(line_to_emit, key=lambda value:value[0])
only_values_line = [value[1] for value in sorted_line]
print ','.join(only_values_line)
但即便如此,输出中的行仍然不会按照您需要的顺序排列。你必须自己对它们进行排序,可能的方式是,你自己传递新行的索引。
假设输入文件是一个 .txt,我正在尝试 运行 它在集群上(如 AWS 上的 EMR)进行测试。
你想要的问题是你要求保持行的顺序,意思是
如果你有:
a,b,c
d,e,f
g,h,i
您希望输出为
a,d,g
b,e,h
c,f,i
但是 MapReduce 不是这样工作的。 MR 将获取文件并将文件拆分到不同的 map reduce 任务, 我本可以给你一个代码,它会产生类似转置的东西, 意义
可以是:
a,g,d
g,e,h
我,c,f
因为线路会被发送到同一个地图,但是线路的顺序没有保持。
除非您可以对文件进行一些预处理,并将行号作为参数添加到每一行。
如果你能做到,代码就很简单了:
假设这是你的文件(左边的数字是行索引)
- a,b,c,d
- e,f,g,h
- i,j,k,l
然后,当它在 emr 集群中流动时,emr 将文件分成几行。
假设行索引首先是这样的:
1,a,b,c
2,d,e,f
3,g,h,i
def map():
for line in sys.stdin:
splitted_line = line.strip().split()
line_index, values = splitted_line[0], splitted_line[1:]
for column_index, value in enumerate(values):
# emits a record were the key in the column index,
# and the value is the line number and the value.
# the '\t' is the delimiter the emr use by default to
# identify the separator between the key and value
# the line looks like this:
# "<column_index>|\t|<line_index>|<value>"
print '|'.join(column_index,'\t',line_index,value)
def reduce():
old_column_index = None
line_to_emit = list()
for line in sys.stdin:
values = line.split("|")
column_index, line_index, value = values[0], values[2], values[3]
if not old_column_index:
old_column_index = column_index
if column_index == old_column_index:
line_to_emit.append((line_index,value))
else:
old_column_index = column_index
sorted_line = sorted(line_to_emit, key=lambda value:value[0])
only_values_line = [value[1] for value in sorted_line]
print ','.join(only_values_line)
但即便如此,输出中的行仍然不会按照您需要的顺序排列。你必须自己对它们进行排序,可能的方式是,你自己传递新行的索引。