在单个 mapreduce 中同时产生最大值和最小值
Yield both max and min in a single mapreduce
我是一名初学者,刚开始使用 MRJob 库在 Python 中编写 MapReduce 程序。
视频教程中的一个示例是通过 location_id 找到最高温度。从那之后写另一个程序来找到最低温度 location_id 也很简单。
我想知道,有没有办法在单个 mapreduce 程序中通过 location_id 产生最高和最低温度?。以下是我的做法:
from mrjob.job import MRJob
'''Sample Data
ITE00100554,18000101,TMAX,-75,,,E,
ITE00100554,18000101,TMIN,-148,,,E,
GM000010962,18000101,PRCP,0,,,E,
EZE00100082,18000101,TMAX,-86,,,E,
EZE00100082,18000101,TMIN,-135,,,E,
ITE00100554,18000102,TMAX,-60,,I,E,
ITE00100554,18000102,TMIN,-125,,,E,
GM000010962,18000102,PRCP,0,,,E,
EZE00100082,18000102,TMAX,-44,,,E,
Output I am expecting to see:
ITE00100554 32.3 20.2
EZE00100082 34.4 19.6
'''
class MaxMinTemperature(MRJob):
def mapper(self, _, line):
location, datetime, measure, temperature, w, x, y, z = line.split(',')
temperature = float(temperature)/10
if measure == 'TMAX' or measure == 'TMIN':
yield location, temperature
def reducer(self, location, temperatures):
yield location, max(temperatures), min(temperatures)
if __name__ == '__main__':
MaxMinTemperature.run()
我收到以下错误:
File "MaxMinTemperature.py", line 12, in reducer
yield location, max(temperatures), min(temperatures)
ValueError: min() arg is an empty sequence
这可能吗?
感谢您的帮助。
希夫
reducer 有两个问题:
如果您检查温度参数的类型,您会发现它是一个发电机。一个生成器只能被遍历一次,所以你不能将同一个生成器同时传递给 'min' 和 'max' 函数。正确的解决方案是手动遍历它。错误的解决方案 - 将其转换为列表 - 可能会导致足够大的输入出现内存不足错误,因为列表将其所有元素保存在内存中而生成器不会。
reducer 的结果必须是二元组。所以你需要在另一个元组中组合你的最低和最高温度。
完整的工作解决方案:
class MaxMinTemperature(MRJob):
def mapper(self, _, line):
location, datetime, measure, temperature, w, x, y, z = line.split(',')
temperature = float(temperature)/10
if measure in ('TMAX', 'TMIN'):
yield location, temperature
def reducer(self, location, temperatures):
min_temp = next(temperatures)
max_temp = min_temp
for item in temperatures:
min_temp = min(item, min_temp)
max_temp = max(item, max_temp)
yield location, (min_temp, max_temp)
问题是您的 reducer
方法中的 temperatures
是 generator.
为了更好地理解,让我们创建一个简单的生成器并查看其行为:
def my_gen(an_iterable):
for item in an_iterable:
yield item
my_generator = my_gen([1,2,3,4,5])
print(type(my_generator)) # <class 'generator'>
О 这种对象的一个特点是一旦耗尽,就不能再使用它:
print(list(my_generator)) # [1, 2, 3, 4, 5]
print(list(my_generator)) # []
因此 max()
和 min()
的顺序执行会导致错误:
my_generator = my_gen([1,2,3,4,5])
print(max(my_generator)) # 5
print(min(my_generator)) # ValueError: min() arg is an empty sequence
因此,您不能将同一个生成器与 max()
和 min()
内置函数一起使用,因为在第二次使用时,生成器将累死了。
相反,您可以:
1) 将生成器转换为列表并使用它:
my_generator = my_gen([1,2,3,4,5])
my_list = list(my_generator)
print(max(my_list)) # 5
print(min(my_list)) # 1
2) 或在 1 个 for 循环中提取生成器的最小值和最大值:
my_generator = my_gen([1,2,3,4,5])
from functools import reduce
val_max, val_min = reduce(lambda x,y: (max(y, x[0]), min(y, x[1])), my_generator, (float('-inf'), float('inf')))
print(val_max, val_min) # 5 1
因此,reducer
的以下编辑:
def reducer(self, location, temperatures):
tempr_list = list(temperatures)
yield location, max(tempr_list), min(tempr_list)
应该修复错误。
我是一名初学者,刚开始使用 MRJob 库在 Python 中编写 MapReduce 程序。
视频教程中的一个示例是通过 location_id 找到最高温度。从那之后写另一个程序来找到最低温度 location_id 也很简单。
我想知道,有没有办法在单个 mapreduce 程序中通过 location_id 产生最高和最低温度?。以下是我的做法:
from mrjob.job import MRJob
'''Sample Data
ITE00100554,18000101,TMAX,-75,,,E,
ITE00100554,18000101,TMIN,-148,,,E,
GM000010962,18000101,PRCP,0,,,E,
EZE00100082,18000101,TMAX,-86,,,E,
EZE00100082,18000101,TMIN,-135,,,E,
ITE00100554,18000102,TMAX,-60,,I,E,
ITE00100554,18000102,TMIN,-125,,,E,
GM000010962,18000102,PRCP,0,,,E,
EZE00100082,18000102,TMAX,-44,,,E,
Output I am expecting to see:
ITE00100554 32.3 20.2
EZE00100082 34.4 19.6
'''
class MaxMinTemperature(MRJob):
def mapper(self, _, line):
location, datetime, measure, temperature, w, x, y, z = line.split(',')
temperature = float(temperature)/10
if measure == 'TMAX' or measure == 'TMIN':
yield location, temperature
def reducer(self, location, temperatures):
yield location, max(temperatures), min(temperatures)
if __name__ == '__main__':
MaxMinTemperature.run()
我收到以下错误:
File "MaxMinTemperature.py", line 12, in reducer
yield location, max(temperatures), min(temperatures)
ValueError: min() arg is an empty sequence
这可能吗?
感谢您的帮助。
希夫
reducer 有两个问题:
如果您检查温度参数的类型,您会发现它是一个发电机。一个生成器只能被遍历一次,所以你不能将同一个生成器同时传递给 'min' 和 'max' 函数。正确的解决方案是手动遍历它。错误的解决方案 - 将其转换为列表 - 可能会导致足够大的输入出现内存不足错误,因为列表将其所有元素保存在内存中而生成器不会。
reducer 的结果必须是二元组。所以你需要在另一个元组中组合你的最低和最高温度。
完整的工作解决方案:
class MaxMinTemperature(MRJob):
def mapper(self, _, line):
location, datetime, measure, temperature, w, x, y, z = line.split(',')
temperature = float(temperature)/10
if measure in ('TMAX', 'TMIN'):
yield location, temperature
def reducer(self, location, temperatures):
min_temp = next(temperatures)
max_temp = min_temp
for item in temperatures:
min_temp = min(item, min_temp)
max_temp = max(item, max_temp)
yield location, (min_temp, max_temp)
问题是您的 reducer
方法中的 temperatures
是 generator.
为了更好地理解,让我们创建一个简单的生成器并查看其行为:
def my_gen(an_iterable):
for item in an_iterable:
yield item
my_generator = my_gen([1,2,3,4,5])
print(type(my_generator)) # <class 'generator'>
О 这种对象的一个特点是一旦耗尽,就不能再使用它:
print(list(my_generator)) # [1, 2, 3, 4, 5]
print(list(my_generator)) # []
因此 max()
和 min()
的顺序执行会导致错误:
my_generator = my_gen([1,2,3,4,5])
print(max(my_generator)) # 5
print(min(my_generator)) # ValueError: min() arg is an empty sequence
因此,您不能将同一个生成器与 max()
和 min()
内置函数一起使用,因为在第二次使用时,生成器将累死了。
相反,您可以:
1) 将生成器转换为列表并使用它:
my_generator = my_gen([1,2,3,4,5])
my_list = list(my_generator)
print(max(my_list)) # 5
print(min(my_list)) # 1
2) 或在 1 个 for 循环中提取生成器的最小值和最大值:
my_generator = my_gen([1,2,3,4,5])
from functools import reduce
val_max, val_min = reduce(lambda x,y: (max(y, x[0]), min(y, x[1])), my_generator, (float('-inf'), float('inf')))
print(val_max, val_min) # 5 1
因此,reducer
的以下编辑:
def reducer(self, location, temperatures):
tempr_list = list(temperatures)
yield location, max(tempr_list), min(tempr_list)
应该修复错误。