在 Mapreduce 中使用 Combiner 计算平均值

Calculating Average with Combiner in Mapreduce

我有一个 .csv 源文件,格式如下:

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,30.95,1,MATT,MORAL,CUREPIPE

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1, MATT,MORAL, CUREPIPE

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,89.95,1,LELA,SMI,HASSEE

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,54.50,1,LELA,SMI,HASSEE

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,2,TOM, SON,FLACQ

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1,DYDY,ARD,PLOUIS

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,22.00,1,DYDY,ARD, PLOUIS

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1,DYDY,ARD, PLOUIS

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,22.00,2,TAY,ANA,VACOAS

Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,35.00,3,TAY,ANA,VACOAS

我想计算在 MapReduce 中使用组合器的每个人的平均成本(价格*qty/total 数量),结果如下:

MATT MORAL 25.45

LELA SMI 72.225

TOM SON 19.95

DYDY ARD 20.36

TAY ANA 29.8

所以我想出了以下无效的代码(给了我两倍的平均值)。我确实觉得我需要在 reducer 中添加一个 IF ELSE 语句来处理组合器(唯一键)的输出与映射器(重复键)的输出不同:

from mrjob.job import MRJob
class Job(MRJob):
    def mapper(self, key, value):
        words = value.strip().split(',')
        full_name = words[-3] + ' ' + words[-2]
        price, qty = float(words[-5]), int(words[-4])
        yield full_name, (price, qty)

    def combiner(self, key, values):
        totalprice, totalqty = 0,0
        for value in values:
            totalprice += (value[0] * value[1])
            totalqty += value[1]
        yield key, (totalprice, totalqty)

    def reducer(self, key, values):
        totalprice, totalqty = 0,0
        for value in values:
            totalprice += (value[0] * value[1])
            totalqty += value[1]
        average = round(totalprice/totalqty,2)
        yield key, average
        
if __name__ == '__main__':
    Job.run()

如果你能给我一些关于 reducer 的指导,我将不胜感激!

你不应该在减速器中加权 totalprice 因为你已经在组合器中做了 -

def reducer(self, key, values):
        totalprice, totalqty = 0,0
        for value in values:
            totalprice += (value[0])
            totalqty += value[1]
        average = round(totalprice/totalqty,2)
        yield key, average

更多解释

这是 Hadoop docs 关于使用“Combiner”的说法 -

Users can optionally specify a combiner, via Job.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.

如果您的 reduce 操作可以分解为多个“mini reduce”而不改变最终结果,那么在混合中引入“Combiner”是可行的。

如果您希望 combinerreducer 函数相同 - 那么您可能需要在 mapper 函数中进行更改 -

像这样 -

from mrjob.job import MRJob
class Job(MRJob):
    def mapper(self, key, value):
        words = value.strip().split(',')
        full_name = words[-3] + ' ' + words[-2]
        price, qty = float(words[-5]), int(words[-4])
        price = price * qty # This is the change
        yield full_name, (price, qty)

    def combiner(self, key, values):
        totalprice, totalqty = 0,0
        for value in values:
            totalprice += (value[0]) # And change here
            totalqty += value[1]
        yield key, (totalprice, totalqty)

    def reducer(self, key, values):
        totalprice, totalqty = 0,0
        for value in values:
            totalprice += (value[0]) # Change here
            totalqty += value[1]
        average = round(totalprice/totalqty,2)
        yield key, average
    
if __name__ == '__main__':
    Job.run()