在 Hadoop MapReduce(使用 Mongo Hadoop 连接器)之后,重复记录被写入 MongoDB
Duplicate records get written to MongoDB after Hadoop MapReduce (using Mongo Hadoop Connector)
我们在 AWS EMR 上的 Hadoop 测试环境
- 1个主节点
- 2个从属节点
当我们提交一个小测试作业时,它会触发 1 个地图任务。 map 任务完成后,将触发 3 个 reduce 任务。
reduce 任务完成后,我们的输出数据将写入 Mongo 集合。但是我们注意到,在某些情况下,输出中有重复的记录。这导致我们的下游处理任务崩溃,因为它们不期望重复。
我注意到的一件事是,其中一个 reduce 任务有时会被终止,然后由 hadoop 重新启动 - 如果它在将数据写入 Mongo 的过程中被终止,这会导致重复记录吗?
是否有任何方法可以从日志中查看 Mongo hadoop 连接器是否确实将数据写入 Mongo?
有什么方法可以确保在提交到 Mongo 之前完全减少所有数据,这样就没有重复项了吗?
如果集群中只有 1 个主节点和 1 个从节点,我们没有遇到过这个问题。然而,这显然是任何扩展尝试的主要障碍...
更新解决问题
根据@ruby 的回答,我创建了 bootstrap 操作来禁用 EMR 上的推测执行。
Mongo 最近还发布了 Mongo hadoop 连接器的更新版本,增加了对推测执行 (1.4.0-rc0) 的支持
https://github.com/mongodb/mongo-hadoop/releases
升级到最新的jar文件并添加bootstrap操作后,发现问题还是没有完全解决。
经过进一步调查,我发现根本问题与合并器步骤的输出如何路由到缩减器任务有关。我们没有实现自定义分区程序,因此 hadoop 使用我们的 Key 实体的 hashCode() 方法。这是使用 Java Objects.hash() 方法,不应在分布式系统上使用该方法,因为它不会 return 跨单独 Java 实例的可靠哈希值。
我们实施了自己的自定义分区程序,这最终解决了重复问题。
Turn off speculative execution by setting these properties in driver class or client side mapred-site.xml.
<property>
<name>mapred.map.tasks.speculative.execution</name>
<value>false</value>
</property>
<property>
<name>mapred.reduce.tasks.speculative.execution</name>
<value>false</value>
</property>
我们在 AWS EMR 上的 Hadoop 测试环境
- 1个主节点
- 2个从属节点
当我们提交一个小测试作业时,它会触发 1 个地图任务。 map 任务完成后,将触发 3 个 reduce 任务。
reduce 任务完成后,我们的输出数据将写入 Mongo 集合。但是我们注意到,在某些情况下,输出中有重复的记录。这导致我们的下游处理任务崩溃,因为它们不期望重复。
我注意到的一件事是,其中一个 reduce 任务有时会被终止,然后由 hadoop 重新启动 - 如果它在将数据写入 Mongo 的过程中被终止,这会导致重复记录吗?
是否有任何方法可以从日志中查看 Mongo hadoop 连接器是否确实将数据写入 Mongo?
有什么方法可以确保在提交到 Mongo 之前完全减少所有数据,这样就没有重复项了吗?
如果集群中只有 1 个主节点和 1 个从节点,我们没有遇到过这个问题。然而,这显然是任何扩展尝试的主要障碍...
更新解决问题
根据@ruby 的回答,我创建了 bootstrap 操作来禁用 EMR 上的推测执行。
Mongo 最近还发布了 Mongo hadoop 连接器的更新版本,增加了对推测执行 (1.4.0-rc0) 的支持 https://github.com/mongodb/mongo-hadoop/releases
升级到最新的jar文件并添加bootstrap操作后,发现问题还是没有完全解决。 经过进一步调查,我发现根本问题与合并器步骤的输出如何路由到缩减器任务有关。我们没有实现自定义分区程序,因此 hadoop 使用我们的 Key 实体的 hashCode() 方法。这是使用 Java Objects.hash() 方法,不应在分布式系统上使用该方法,因为它不会 return 跨单独 Java 实例的可靠哈希值。
我们实施了自己的自定义分区程序,这最终解决了重复问题。
Turn off speculative execution by setting these properties in driver class or client side mapred-site.xml.
<property> <name>mapred.map.tasks.speculative.execution</name> <value>false</value> </property> <property> <name>mapred.reduce.tasks.speculative.execution</name> <value>false</value> </property>