如何使用 Python 输出键值对,以便 1 小时内的时间在 MapReduce 中的减速器中结束?

How output key,value pair so that time within 1 hour ends up in a reducer in MapReduce using Python?

我有一种情况需要处理一个非常大的格式如下的文本文件:

ID \t time \t duration \t Description \t status

我想利用 MapReduce 来帮助我处理这个文件。我知道 MapReduce 基于键值对工作。 Mapper 将输出键和一些值,MapReduce 将确保所有相同的键最终都在 1 个 reducer 中。

我想要在减速器中结束的是时间间隔在 1 小时以内的行。然后在 reducer 中,我想访问所有其他信息以及 ID、持续时间、状态来做其他事情。所以我猜想输出的值是一个列表还是什么?

我有一些 Python 代码来处理输入数据。 mapper.py

#!/usr/bin/env python
import sys
import re
for line in sys.stdin:
   line=line.strip()
   portions=re.split(r'\t+',line)
   time=portions[1]
#output key,value by print to stdout for reducer.py to read in.

请注意,我的数据集中的时间已经是 POSIX 时间格式。

我如何在 Mapper 中输出键值对来做到这一点?

我对 MapReduce/Hadoop 还是很陌生,感谢所有帮助。提前致谢!

这是一个策略:

  • 来自映射器:发出每条记录的三个副本并使用二次排序:

    ( (复合键), 值) =

    • ((消息的小时-一小时,当前消息的精确时间),消息)
    • ((消息时间,消息准确时间),消息)
    • ((消息小时+一小时,消息精确时间),消息)

现在:您需要进行标准二次排序:

  • 仅将 Key 的前半部分(消息的小时)设置为 Partitioner
  • setGroupingComparator 仅设置为密钥的前半部分(消息的小时数)
  • setSortingComparator 为(消息的小时,消息的精确时间)

在 reducer 中:每个 reducer 组在消息的精确时间 +/- 60 到 120 分钟内接收所有消息。减速器按排序顺序查看所有 "precise time of message"。因此,您可以在每个 reducer

中保持过去 60 分钟内查看的所有消息的滑动 window

注意 以上假设 60 分钟消息的数据可以容纳在单个 reducer 任务的内存中。否则,您将需要将数据写入磁盘作为 windowing 函数的一部分。

更新 OP 要求对 windowing 进行进一步说明,所以我们开始吧。

从Mapper-emitted key的角度考虑:每条输入记录有3个key。现在在 Reducer 上,这意味着每个输入记录出现在三个不同的组中。这样做的原因是我们需要针对每个输入记录同时考虑超前和滞后记录。所以现在我们让每个组都可以访问所有输入记录,这些记录可能在最早记录的 60 分钟内,也可能在最新记录的 60 分钟内。由于记录按每小时最早的秒数分组:这意味着 -60(分钟)到 +120(最大)与属于给定小时组内的任何记录相比。