如何 map/reduce 求和和最大日期?

How to map/reduce with sum and max date?

我有一个我需要的文件 map/reduced,其中输出需要一个日期的总和和最大值。我有求和部分,但是,我不确定如何将最大日期作为减少输出的一部分。

输入数据如下所示:

ID1,  ID2, date,                count
3000, 001, 2014-12-30 18:00:00, 2
3000, 001, 2015-01-01 10:00:00, 1
3000, 002, 2014-11-18 12:53:00, 5
3000, 002, 2014-12-20 20:14:00, 3

我的映射器连接 ID1 + ID2,以便将它们分组。它的输出如下所示:

key (ID1|ID2), value (count)
3000|001,      2
3000|001,      1
3000|002,      5
3000|002,      3

reducer 输出如下所示:

key (ID1|ID2), value (sum)
3000|001,      3
3000|002,      8

我真正需要的是这样的输出:

key (ID1|ID2), value (sum), date (max)
3000|001,      3,           2015-01-01 10:00:00
3000|002,      8,           2014-12-20 20:14:00

mapper 和 reducer 是用 Ruby 编写的,但是,我将举一个用 Python 编写的工作示例(我将其翻译为 Ruby)。

这是映射器代码:

require 'csv'

pattern = File.join(File.expand_path('data', File.dirname(__FILE__)), '*.txt')

Dir.glob(pattern).each do |file|
  CSV.foreach(file, {col_sep: "\t", headers: false}) do |row|
    puts [
           "#{row[6]}|#{row[3].rjust(8, '0')}", # key = ID1 | ID2
           row[7] # value = count
         ].join("\t")
    end
end

和减速器:

prev_key  = nil
key_total = 0

ARGF.each do |line|
  line = line.chomp
  next unless line

  (key, value) = line.split("\t")

  # check for new key
  if prev_key && key != prev_key && key_total > 0

    # output total for previous key
    puts [prev_key, key_total].join("\t")

    # reset key total for new key
    prev_key  = key
    key_total = 0

  elsif !prev_key
    prev_key = key

  end

  # add to count for this current key
  key_total += value.to_i

end

# this is to catch the final counts after all records have been received
puts [prev_key, key_total].join("\t")

更新

这是基于已接受答案的建议的新映射器和缩减器:

映射器:

require 'csv'

pattern = File.join(File.expand_path('data', File.dirname(__FILE__)), '*.txt')

Dir.glob(pattern).each do |file|
  CSV.foreach(file, {col_sep: "\t", headers: false}) do |row|
    date_time = "#{row[0]} #{row[1]}:00:00#{row[2]}" # %Y-%m-%d %H:%M:%S%z
    puts [
             "#{row[6]}|#{row[3].rjust(8, '0')}", # key = ID1 | ID2
             "#{row[7]}|#{date_time}", # value = count | date_time
         ].join("\t")
  end
end

减速器:

require 'date'

prev_key  = nil
key_total = 0
dates = []

ARGF.each do |line|
  line = line.chomp
  next unless line

  (key, values) = line.split("\t")
  (value, date_time) = values.split('|')

  # check for new key
  if prev_key && key != prev_key && key_total > 0

    # output total for previous key
    puts [prev_key.split('|'), key_total, dates.max].join("\t")

    # reset key total for new key
    prev_key  = key
    key_total = 0

    # reset dates array for new key
    dates.clear

  elsif !prev_key
    prev_key = key

  end

  # add date to array for this current key
  dates << DateTime.strptime(date_time, '%Y-%m-%d %H:%M:%S%z')

  # add to count for this current key
  key_total += value.to_i

end

# this is to catch the final counts after all records have been received
puts [prev_key.split('|'), key_total, dates.max].join("\t") 

您只需将日期和计数放入一对 中,然后将其作为映射器的值发出。然后在 reducer 中提取日期并从对值中计数。像您一样计算总和,并跟踪输入值的最大日期(每个键)。