MRJob 排序减速器输出
MRJob sort reducer output
有什么方法可以使用 mrjob 对 reducer 函数的输出进行排序吗?
我认为 reducer 函数的输入是按键排序的,我试图利用此功能使用另一个 reducer 对输出进行排序,如下所示,我知道值具有数值,我想计算每个键的数量并根据此计数对键进行排序:
def mapper_1(self, key, line):
key = #extract key from the line
yield (key, 1)
def reducer_1(self, key, values):
yield key, sum(values)
def mapper_2(self, key, count):
yield ('%020d' % int(count), key)
def reducer_2(self, count, keys):
for key in keys:
yield key, int(count)
但它的输出没有正确排序!我怀疑这种奇怪的行为是由于将 int
s 操纵为 string
并尝试将其格式化为 this link 所说但它没有用!
重要说明: 当我使用调试器查看 reducer_2
的输出顺序时,顺序是正确的,但打印为输出的是其他内容!! !
重要说明 2: 在另一台计算机上相同的程序处理相同的数据 returns 输出按预期排序!
您可以在第二个 reducer 中将值排序为整数,然后将它们转换为零填充表示:
import re
from mrjob.job import MRJob
from mrjob.step import MRStep
WORD_RE = re.compile(r"[\w']+")
class MRWordFrequencyCount(MRJob):
def steps(self):
return [
MRStep(
mapper=self.mapper_extract_words, combiner=self.combine_word_counts,
reducer=self.reducer_sum_word_counts
),
MRStep(
reducer=self.reduce_sort_counts
)
]
def mapper_extract_words(self, _, line):
for word in WORD_RE.findall(line):
yield word.lower(), 1
def combine_word_counts(self, word, counts):
yield word, sum(counts)
def reducer_sum_word_counts(self, key, values):
yield None, (sum(values), key)
def reduce_sort_counts(self, _, word_counts):
for count, key in sorted(word_counts, reverse=True):
yield ('%020d' % int(count), key)
好吧,这是在内存中对输出进行排序,这可能是一个问题,具体取决于输入的大小。但是你想要它排序所以它必须以某种方式排序。
有什么方法可以使用 mrjob 对 reducer 函数的输出进行排序吗?
我认为 reducer 函数的输入是按键排序的,我试图利用此功能使用另一个 reducer 对输出进行排序,如下所示,我知道值具有数值,我想计算每个键的数量并根据此计数对键进行排序:
def mapper_1(self, key, line):
key = #extract key from the line
yield (key, 1)
def reducer_1(self, key, values):
yield key, sum(values)
def mapper_2(self, key, count):
yield ('%020d' % int(count), key)
def reducer_2(self, count, keys):
for key in keys:
yield key, int(count)
但它的输出没有正确排序!我怀疑这种奇怪的行为是由于将 int
s 操纵为 string
并尝试将其格式化为 this link 所说但它没有用!
重要说明: 当我使用调试器查看 reducer_2
的输出顺序时,顺序是正确的,但打印为输出的是其他内容!! !
重要说明 2: 在另一台计算机上相同的程序处理相同的数据 returns 输出按预期排序!
您可以在第二个 reducer 中将值排序为整数,然后将它们转换为零填充表示:
import re
from mrjob.job import MRJob
from mrjob.step import MRStep
WORD_RE = re.compile(r"[\w']+")
class MRWordFrequencyCount(MRJob):
def steps(self):
return [
MRStep(
mapper=self.mapper_extract_words, combiner=self.combine_word_counts,
reducer=self.reducer_sum_word_counts
),
MRStep(
reducer=self.reduce_sort_counts
)
]
def mapper_extract_words(self, _, line):
for word in WORD_RE.findall(line):
yield word.lower(), 1
def combine_word_counts(self, word, counts):
yield word, sum(counts)
def reducer_sum_word_counts(self, key, values):
yield None, (sum(values), key)
def reduce_sort_counts(self, _, word_counts):
for count, key in sorted(word_counts, reverse=True):
yield ('%020d' % int(count), key)
好吧,这是在内存中对输出进行排序,这可能是一个问题,具体取决于输入的大小。但是你想要它排序所以它必须以某种方式排序。