在 Mapreduce 中使用 Combiner 计算平均值
Calculating Average with Combiner in Mapreduce
我有一个 .csv 源文件,格式如下:
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,30.95,1,MATT,MORAL,CUREPIPE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1, MATT,MORAL, CUREPIPE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,89.95,1,LELA,SMI,HASSEE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,54.50,1,LELA,SMI,HASSEE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,2,TOM, SON,FLACQ
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1,DYDY,ARD,PLOUIS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,22.00,1,DYDY,ARD, PLOUIS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1,DYDY,ARD, PLOUIS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,22.00,2,TAY,ANA,VACOAS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,35.00,3,TAY,ANA,VACOAS
我想计算在 MapReduce 中使用组合器的每个人的平均成本(价格*qty/total 数量),结果如下:
MATT MORAL 25.45
LELA SMI 72.225
TOM SON 19.95
DYDY ARD 20.36
TAY ANA 29.8
所以我想出了以下无效的代码(给了我两倍的平均值)。我确实觉得我需要在 reducer 中添加一个 IF ELSE 语句来处理组合器(唯一键)的输出与映射器(重复键)的输出不同:
from mrjob.job import MRJob
class Job(MRJob):
def mapper(self, key, value):
words = value.strip().split(',')
full_name = words[-3] + ' ' + words[-2]
price, qty = float(words[-5]), int(words[-4])
yield full_name, (price, qty)
def combiner(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice += (value[0] * value[1])
totalqty += value[1]
yield key, (totalprice, totalqty)
def reducer(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice += (value[0] * value[1])
totalqty += value[1]
average = round(totalprice/totalqty,2)
yield key, average
if __name__ == '__main__':
Job.run()
如果你能给我一些关于 reducer 的指导,我将不胜感激!
你不应该在减速器中加权 totalprice
因为你已经在组合器中做了 -
def reducer(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice += (value[0])
totalqty += value[1]
average = round(totalprice/totalqty,2)
yield key, average
更多解释
这是 Hadoop docs 关于使用“Combiner”的说法 -
Users can optionally specify a combiner, via
Job.setCombinerClass(Class), to perform local aggregation of the
intermediate outputs, which helps to cut down the amount of data
transferred from the Mapper to the Reducer.
如果您的 reduce 操作可以分解为多个“mini reduce”而不改变最终结果,那么在混合中引入“Combiner”是可行的。
如果您希望 combiner
和 reducer
函数相同 - 那么您可能需要在 mapper
函数中进行更改 -
像这样 -
from mrjob.job import MRJob
class Job(MRJob):
def mapper(self, key, value):
words = value.strip().split(',')
full_name = words[-3] + ' ' + words[-2]
price, qty = float(words[-5]), int(words[-4])
price = price * qty # This is the change
yield full_name, (price, qty)
def combiner(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice += (value[0]) # And change here
totalqty += value[1]
yield key, (totalprice, totalqty)
def reducer(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice += (value[0]) # Change here
totalqty += value[1]
average = round(totalprice/totalqty,2)
yield key, average
if __name__ == '__main__':
Job.run()
我有一个 .csv 源文件,格式如下:
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,30.95,1,MATT,MORAL,CUREPIPE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1, MATT,MORAL, CUREPIPE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,89.95,1,LELA,SMI,HASSEE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,54.50,1,LELA,SMI,HASSEE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,2,TOM, SON,FLACQ
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1,DYDY,ARD,PLOUIS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,22.00,1,DYDY,ARD, PLOUIS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1,DYDY,ARD, PLOUIS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,22.00,2,TAY,ANA,VACOAS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,35.00,3,TAY,ANA,VACOAS
我想计算在 MapReduce 中使用组合器的每个人的平均成本(价格*qty/total 数量),结果如下:
MATT MORAL 25.45
LELA SMI 72.225
TOM SON 19.95
DYDY ARD 20.36
TAY ANA 29.8
所以我想出了以下无效的代码(给了我两倍的平均值)。我确实觉得我需要在 reducer 中添加一个 IF ELSE 语句来处理组合器(唯一键)的输出与映射器(重复键)的输出不同:
from mrjob.job import MRJob
class Job(MRJob):
def mapper(self, key, value):
words = value.strip().split(',')
full_name = words[-3] + ' ' + words[-2]
price, qty = float(words[-5]), int(words[-4])
yield full_name, (price, qty)
def combiner(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice += (value[0] * value[1])
totalqty += value[1]
yield key, (totalprice, totalqty)
def reducer(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice += (value[0] * value[1])
totalqty += value[1]
average = round(totalprice/totalqty,2)
yield key, average
if __name__ == '__main__':
Job.run()
如果你能给我一些关于 reducer 的指导,我将不胜感激!
你不应该在减速器中加权 totalprice
因为你已经在组合器中做了 -
def reducer(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice += (value[0])
totalqty += value[1]
average = round(totalprice/totalqty,2)
yield key, average
更多解释
这是 Hadoop docs 关于使用“Combiner”的说法 -
Users can optionally specify a combiner, via Job.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.
如果您的 reduce 操作可以分解为多个“mini reduce”而不改变最终结果,那么在混合中引入“Combiner”是可行的。
如果您希望 combiner
和 reducer
函数相同 - 那么您可能需要在 mapper
函数中进行更改 -
像这样 -
from mrjob.job import MRJob
class Job(MRJob):
def mapper(self, key, value):
words = value.strip().split(',')
full_name = words[-3] + ' ' + words[-2]
price, qty = float(words[-5]), int(words[-4])
price = price * qty # This is the change
yield full_name, (price, qty)
def combiner(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice += (value[0]) # And change here
totalqty += value[1]
yield key, (totalprice, totalqty)
def reducer(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice += (value[0]) # Change here
totalqty += value[1]
average = round(totalprice/totalqty,2)
yield key, average
if __name__ == '__main__':
Job.run()