Python - 如何将目录作为 MapReduce 输入传递
Python - How to pass a directory as MapReduce input
我在 Python 中编写了一个简单的 MapReduce 示例。如果输入是文件,例如 text
文件,对于 运行 代码,我们只需使用以下模式:cat <data> | map | sort | reduce
,例如在我的例子中是:cat data | ./mapper.py | sort | ./reducer.py
一切正常。
但我更改了映射器和缩减器以从包含 .gz
文件的 directory
读取数据。所以我应该将 path of the directory
作为输入传递。当包含数据的目录为 dat/
时,我测试了以下终端命令 cat dat/ | ./mapper.py | sort | ./reducer.py
,但我遇到了错误:
cat: dat/: Is a directory
Traceback (most recent call last):
File "./mapper.py", line 9, in <module>
for filename in glob.glob(sys.stdin + '*.gz'):
TypeError: unsupported operand type(s) for +: 'file' and 'str'
如何在 Python 中将目录作为输入传递给 Mapreduce?
以下是我的代码:
mapper.py
#!/usr/bin/env python
import sys
#import timeit
import glob
import gzip
QUALITY = '01459'
MISSING = '+9999'
for filename in glob.glob(sys.stdin + '*.gz'):
f = gzip.open(filename, 'r')
for line in f:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if temp != MISSING and q in QUALITY:
print " %s\t%s" % (year, temp)
reducer.py
#!/usr/bin/env python
import sys
max_val = -sys.maxint
key = ''
for line in sys.stdin:
(key, val) = line.strip().split('\t')
max_val = max(max_val, int(val))
print "The last IF %s\t%s" % (key, max_val)
要获取当前工作目录的路径,请使用:
import os
path = os.getcwd()
您可以从该文件中获取所有文件:
filenames = os.listdir(path)
# filter files that doesn't have .gz filetype
filenames = [file_name for file_name in filenames if file_name.endswith('.gz')]
您可以简单地迭代文件:
for filename in filenames:
f = gzip.open(path+filename, 'r')
行 for filename in glob.glob(sys.stdin + '*.gz'):
需要来自 stdin
的字符串。因此,只需传递一个字符串 (echo
) 而不是文件内容 (cat
):
$ echo dat/ | ./mapper.py | sort | ./reducer.py
但是,为什么要通过管道传递参数?通常参数由 python 通过 sys.argv
直接传递和读取(或者通过诸如“argparse”之类的解释器更好)。
我在 Python 中编写了一个简单的 MapReduce 示例。如果输入是文件,例如 text
文件,对于 运行 代码,我们只需使用以下模式:cat <data> | map | sort | reduce
,例如在我的例子中是:cat data | ./mapper.py | sort | ./reducer.py
一切正常。
但我更改了映射器和缩减器以从包含 .gz
文件的 directory
读取数据。所以我应该将 path of the directory
作为输入传递。当包含数据的目录为 dat/
时,我测试了以下终端命令 cat dat/ | ./mapper.py | sort | ./reducer.py
,但我遇到了错误:
cat: dat/: Is a directory
Traceback (most recent call last):
File "./mapper.py", line 9, in <module>
for filename in glob.glob(sys.stdin + '*.gz'):
TypeError: unsupported operand type(s) for +: 'file' and 'str'
如何在 Python 中将目录作为输入传递给 Mapreduce?
以下是我的代码:
mapper.py
#!/usr/bin/env python
import sys
#import timeit
import glob
import gzip
QUALITY = '01459'
MISSING = '+9999'
for filename in glob.glob(sys.stdin + '*.gz'):
f = gzip.open(filename, 'r')
for line in f:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if temp != MISSING and q in QUALITY:
print " %s\t%s" % (year, temp)
reducer.py
#!/usr/bin/env python
import sys
max_val = -sys.maxint
key = ''
for line in sys.stdin:
(key, val) = line.strip().split('\t')
max_val = max(max_val, int(val))
print "The last IF %s\t%s" % (key, max_val)
要获取当前工作目录的路径,请使用:
import os
path = os.getcwd()
您可以从该文件中获取所有文件:
filenames = os.listdir(path)
# filter files that doesn't have .gz filetype
filenames = [file_name for file_name in filenames if file_name.endswith('.gz')]
您可以简单地迭代文件:
for filename in filenames:
f = gzip.open(path+filename, 'r')
行 for filename in glob.glob(sys.stdin + '*.gz'):
需要来自 stdin
的字符串。因此,只需传递一个字符串 (echo
) 而不是文件内容 (cat
):
$ echo dat/ | ./mapper.py | sort | ./reducer.py
但是,为什么要通过管道传递参数?通常参数由 python 通过 sys.argv
直接传递和读取(或者通过诸如“argparse”之类的解释器更好)。