如何对 json 文件使用多步 mrjob
How to use multistep mrjob with json file
我正在尝试使用 hadoop 从 json 文件中获取一些统计信息,例如评论最多的类别或语言的平均星数。为此,我正在使用 mrjob,我发现了这个 code:
import re
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol
from mrjob.step import MRStep
WORD_RE = re.compile(r"[\w']+")
class MRMostUsedWord(MRJob):
FILES = ['stop_words.txt']
OUTPUT_PROTOCOL = JSONValueProtocol
def configure_args(self):
super(MRMostUsedWord, self).configure_args()
# allow for alternate stop words file
self.add_file_arg(
'--stop-words-file',
dest='stop_words_file',
default=None,
help='alternate stop words file. lowercase words, one per line',
)
def mapper_init(self):
stop_words_path = self.options.stop_words_file or 'stop_words.txt'
with open(stop_words_path) as f:
self.stop_words = set(line.strip() for line in f)
def mapper_get_words(self, _, line):
# yield each word in the line
for word in WORD_RE.findall(line):
word = word.lower()
if word not in self.stop_words:
yield (word, 1)
def combiner_count_words(self, word, counts):
# sum the words we've seen so far
yield (word, sum(counts))
def reducer_count_words(self, word, counts):
# send all (num_occurrences, word) pairs to the same reducer.
# num_occurrences is so we can easily use Python's max() function.
yield None, (sum(counts), word)
# discard the key; it is just None
def reducer_find_max_word(self, _, word_count_pairs):
# each item of word_count_pairs is (count, word),
# so yielding one results in key=counts, value=word
try:
yield max(word_count_pairs)
except ValueError:
pass
def steps(self):
return [
MRStep(mapper_init=self.mapper_init,
mapper=self.mapper_get_words,
combiner=self.combiner_count_words,
reducer=self.reducer_count_words),
MRStep(reducer=self.reducer_find_max_word)
]
if __name__ == '__main__':
MRMostUsedWord.run()
它可以找到最常用的词,但我不确定如何使用 json 属性而不是词来做到这一点。
json的示例:
{"review_id": "en_0690095", "product_id": "product_en_0440378", "reviewer_id": "reviewer_en_0133349", "stars": "1", "review_body": "the cabinet dot were all detached from backing... got me", "review_title": "Not use able", "language": "en", "product_category": "home_improvement"}
{"review_id": "en_0311558", "product_id": "product_en_0399702", "reviewer_id": "reviewer_en_0152034", "stars": "1", "review_body": "I received my first order of this product and it was broke so I ordered it again. The second one was broke in more places than the first. I can't blame the shipping process as it's shrink wrapped and boxed.", "review_title": "The product is junk.", "language": "en", "product_category": "home"}
对我来说,使用 json.loads 就很有用,例如:
def mapper(self, _, line):
review = json.loads(line)
我正在尝试使用 hadoop 从 json 文件中获取一些统计信息,例如评论最多的类别或语言的平均星数。为此,我正在使用 mrjob,我发现了这个 code:
import re
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol
from mrjob.step import MRStep
WORD_RE = re.compile(r"[\w']+")
class MRMostUsedWord(MRJob):
FILES = ['stop_words.txt']
OUTPUT_PROTOCOL = JSONValueProtocol
def configure_args(self):
super(MRMostUsedWord, self).configure_args()
# allow for alternate stop words file
self.add_file_arg(
'--stop-words-file',
dest='stop_words_file',
default=None,
help='alternate stop words file. lowercase words, one per line',
)
def mapper_init(self):
stop_words_path = self.options.stop_words_file or 'stop_words.txt'
with open(stop_words_path) as f:
self.stop_words = set(line.strip() for line in f)
def mapper_get_words(self, _, line):
# yield each word in the line
for word in WORD_RE.findall(line):
word = word.lower()
if word not in self.stop_words:
yield (word, 1)
def combiner_count_words(self, word, counts):
# sum the words we've seen so far
yield (word, sum(counts))
def reducer_count_words(self, word, counts):
# send all (num_occurrences, word) pairs to the same reducer.
# num_occurrences is so we can easily use Python's max() function.
yield None, (sum(counts), word)
# discard the key; it is just None
def reducer_find_max_word(self, _, word_count_pairs):
# each item of word_count_pairs is (count, word),
# so yielding one results in key=counts, value=word
try:
yield max(word_count_pairs)
except ValueError:
pass
def steps(self):
return [
MRStep(mapper_init=self.mapper_init,
mapper=self.mapper_get_words,
combiner=self.combiner_count_words,
reducer=self.reducer_count_words),
MRStep(reducer=self.reducer_find_max_word)
]
if __name__ == '__main__':
MRMostUsedWord.run()
它可以找到最常用的词,但我不确定如何使用 json 属性而不是词来做到这一点。
json的示例:
{"review_id": "en_0690095", "product_id": "product_en_0440378", "reviewer_id": "reviewer_en_0133349", "stars": "1", "review_body": "the cabinet dot were all detached from backing... got me", "review_title": "Not use able", "language": "en", "product_category": "home_improvement"}
{"review_id": "en_0311558", "product_id": "product_en_0399702", "reviewer_id": "reviewer_en_0152034", "stars": "1", "review_body": "I received my first order of this product and it was broke so I ordered it again. The second one was broke in more places than the first. I can't blame the shipping process as it's shrink wrapped and boxed.", "review_title": "The product is junk.", "language": "en", "product_category": "home"}
对我来说,使用 json.loads 就很有用,例如:
def mapper(self, _, line):
review = json.loads(line)