如何对 json 文件使用多步 mrjob

How to use multistep mrjob with json file

我正在尝试使用 hadoop 从 json 文件中获取一些统计信息,例如评论最多的类别或语言的平均星数。为此,我正在使用 mrjob,我发现了这个 code:

import re

from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol
from mrjob.step import MRStep

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWord(MRJob):
FILES = ['stop_words.txt']

OUTPUT_PROTOCOL = JSONValueProtocol

def configure_args(self):
    super(MRMostUsedWord, self).configure_args()

    # allow for alternate stop words file
    self.add_file_arg(
        '--stop-words-file',
        dest='stop_words_file',
        default=None,
        help='alternate stop words file. lowercase words, one per line',
    )

def mapper_init(self):
    stop_words_path = self.options.stop_words_file or 'stop_words.txt'

    with open(stop_words_path) as f:
        self.stop_words = set(line.strip() for line in f)

def mapper_get_words(self, _, line):
    # yield each word in the line
    for word in WORD_RE.findall(line):
        word = word.lower()
        if word not in self.stop_words:
            yield (word, 1)

def combiner_count_words(self, word, counts):
    # sum the words we've seen so far
    yield (word, sum(counts))

def reducer_count_words(self, word, counts):
    # send all (num_occurrences, word) pairs to the same reducer.
    # num_occurrences is so we can easily use Python's max() function.
    yield None, (sum(counts), word)

# discard the key; it is just None
def reducer_find_max_word(self, _, word_count_pairs):
    # each item of word_count_pairs is (count, word),
    # so yielding one results in key=counts, value=word
    try:
        yield max(word_count_pairs)
    except ValueError:
        pass

def steps(self):
    return [
        MRStep(mapper_init=self.mapper_init,
               mapper=self.mapper_get_words,
               combiner=self.combiner_count_words,
               reducer=self.reducer_count_words),
        MRStep(reducer=self.reducer_find_max_word)
    ]


if __name__ == '__main__':
    MRMostUsedWord.run()

它可以找到最常用的词,但我不确定如何使用 json 属性而不是词来做到这一点。

json的示例:

{"review_id": "en_0690095", "product_id": "product_en_0440378", "reviewer_id": "reviewer_en_0133349", "stars": "1", "review_body": "the cabinet dot were all detached from backing... got me", "review_title": "Not use able", "language": "en", "product_category": "home_improvement"}

{"review_id": "en_0311558", "product_id": "product_en_0399702", "reviewer_id": "reviewer_en_0152034", "stars": "1", "review_body": "I received my first order of this product and it was broke so I ordered it again. The second one was broke in more places than the first. I can't blame the shipping process as it's shrink wrapped and boxed.", "review_title": "The product is junk.", "language": "en", "product_category": "home"}

对我来说,使用 json.loads 就很有用,例如:

def mapper(self, _, line):
    review = json.loads(line)