我们什么时候可以为 hadoop Mapper 初始化资源?

When can we init resources for a hadoop Mapper?

我有一个小型 sqlite 数据库(post 代码 -> 美国城市名称),我有一个很大的用户 S3 文件。我想将每个用户映射到与其 postcode 相关联的城市名称。

我遵循著名的 WordCount.java 示例,但我不确定 mapReduce 的内部工作方式:

1) 每 1 个拆分创建一次映射器,通常为 128 或 256mb。 您可以使用此参数配置拆分大小:mapreduce.input.fileinputformat.split.minsizemapreduce.input.fileinputformat.split.maxsize。如果输入文件小于拆分大小,则全部进入一个映射任务。

2) 您可以使用方法setupcleanup 为任务配置资源。 setup 在任务开始时调用一次,cleanup 在任务结束时调用一次。因此,您可以在 setup 方法中建立与数据库的连接(可能不仅仅是连接,而是将所有城市加载到内存中以提高性能)并关闭连接(如果您决定不加载数据,而只是连接)在 cleanup

MapReduce 是一个框架,用于编写应用程序以可靠和容错的方式在大型商用硬件集群上并行处理大数据。 MapReduce 在 HDFS(Hadoop 分布式文件系统)之上以两个不同的阶段执行,称为映射阶段和减少阶段。

回答您的问题我的映射器是否为每个 s3 输入文件创建一次?

创建的Mapper等于分割数 并且默认创建的分割等于块的数量。

高级概述类似于

input file->InputFormat->Splits->RecordReader->Mapper->Partitioner->Shuffle&Sort->Reducer->final output

示例,

  1. 您的输入文件- server1.log,server2.log,server3.log
  2. InputFormat 将根据块大小创建分割数(默认)
  3. 对应于每个Split,将分配一个Mapper来处理每个split。
  4. 要从 Split 获取记录行,RecordReader 将位于 Mapper 和 Split 之间。
  5. Partitioner 将启动。
  6. Partitioner Shuffle&Sort 阶段将开始后。
  7. 减速器
  8. 最终输出。

第二个问题的答案: 下面是Mapper的三种标准生命周期方法。

@Override
 protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
   throws IOException, InterruptedException {

  // Filter your data
  }

 }
 @Override
 protected void setup(Mapper<Object, Text, Text, IntWritable>.Context context)
   throws IOException, InterruptedException {
  System.out.println("calls only once at startup");
 }
 @Override
 protected void cleanup(Mapper<Object, Text, Text, IntWritable>.Context context)
   throws IOException, InterruptedException {
  System.out.println("calls only once at end");
 }