Hadoop 性能建模

Hadoop performance modeling

我正在研究 Hadoop 性能建模。 Hadoop 有 200 多个参数,因此无法手动设置它们。我们经常 运行 我们的 hadoop 作业使用默认参数值(比如使用默认值 io.sort.mb、io.sort.record.percent、mapred.output.compress 等)。但是使用默认参数值给了我们次优性能。 Herodotos Herodotou (http://www.cs.duke.edu/starfish/files/vldb11-job-optimization.pdf) 在这方面做了一些工作来提高性能。但我对他们的工作有以下疑问--

  1. 他们正在为 MapReduce 作业的所有阶段(读取、映射、收集等)在作业开始时固定参数值(根据数据的比例假设)。我们可以根据 运行 时间环境(如集群配置、底层文件系统等)在 运行 时间为每个阶段设置不同的这些参数值,方法是将特定节点的 Hadoop 配置日志文件更改为从节点获得最佳性能?
  2. 他们正在为 Hadoop 核心使用白盒模型,他们是否仍然适用于 当前 Hadoop ( http://arxiv.org/pdf/1106.0940.pdf) ?

不,您不能动态更改每个节点每个作业的 MapReduce 参数。

配置节点集

您可以做的是在配置文件中静态更改每个节点的配置参数(通常位于 /etc/hadoop/conf),这样您就可以充分利用不同 h/w 配置。

示例:假设您有 20 个具有不同硬件配置的工作节点,例如:

  • 10 配置 128GB 内存,24 核
  • 10 配置 64GB 内存,12 核

在这种情况下,您可能希望配置每个相同的服务器以充分利用硬件,例如,您可能希望 运行 在具有更多 RAM 的工作节点上执行更多子任务(映射器和缩减器)和核心,例如:

  • 具有 128GB RAM、24 个内核的节点 => 36 个工作任务(映射器 + reducer),每个工作任务的 JVM 堆大约为 3GB。
  • 具有 64GB RAM、12 个内核的节点 => 18 个工作任务(映射器 + reducer),每个工作任务的 JVM 堆大约为 3GB。

因此,您需要使用适当的参数分别配置节点集。

使用 ToolRunner 将配置参数动态传递给作业:

此外,您可以动态更改每个作业的 MapReduce 作业参数,但这些参数将应用于整个集群,而不仅仅是一组节点。如果您的 MapReduce 作业驱动程序扩展 ToolRunner.

ToolRunner 允许您解析 generic hadoop command line arguments。您将能够使用 -D property.name=property.value.

传递 MapReduce 配置参数

您几乎可以将几乎所有 hadoop 参数动态传递给作业。但大多数 通常 将 MapReduce 配置参数动态传递给作业是:

  • mapreduce.task.io.sort.mb
  • mapreduce.map.speculative
  • mapreduce.job.reduces
  • mapreduce.task.io.sort.factor
  • mapreduce.map.output.compress
  • mapreduce.map.outout.compress.codec
  • mapreduce.reduce.memory.mb
  • mapreduce.map.memory.mb

这是一个示例 terasort 作业,每个作业动态传递大量参数:


hadoop jar hadoop-mapreduce-examples.jar tearsort \
    
  -Ddfs.replication=1 -Dmapreduce.task.io.sort.mb=500 \
    
  -Dmapreduce.map.sort.splill.percent=0.9 \
    
  -Dmapreduce.reduce.shuffle.parallelcopies=10 \
    
  -Dmapreduce.reduce.shuffle.memory.limit.percent=0.1 \
    
  -Dmapreduce.reduce.shuffle.input.buffer.percent=0.95 \
    
  -Dmapreduce.reduce.input.buffer.percent=0.95 \
    
  -Dmapreduce.reduce.shuffle.merge.percent=0.95 \
    
  -Dmapreduce.reduce.merge.inmem.threshold=0 \
    
  -Dmapreduce.job.speculative.speculativecap=0.05 \
    
  -Dmapreduce.map.speculative=false \
    
  -Dmapreduce.map.reduce.speculative=false \

  -Dmapreduce.job.jvm.numtasks=-1 \
  -Dmapreduce.job.reduces=84 \

  -Dmapreduce.task.io.sort.factor=100 \
    
  -Dmapreduce.map.output.compress=true \

  -Dmapreduce.map.outout.compress.codec=\
        
    org.apache.hadoop.io.compress.SnappyCodec \
    
  -Dmapreduce.job.reduce.slowstart.completedmaps=0.4 \
    
  -Dmapreduce.reduce.merge.memtomem.enabled=fasle \
    
  -Dmapreduce.reduce.memory.totalbytes=12348030976 \
    
  -Dmapreduce.reduce.memory.mb=12288 \

  -Dmapreduce.reduce.java.opts=“-Xms11776m -Xmx11776m \
      
    -XX:+UseConcMarkSweepGC -XX:+CMSIncrementalMode \
      
    -XX:+CMSIncrementalPacing -XX:ParallelGCThreads=4” \

  -Dmapreduce.map.memory.mb=4096 \

  -Dmapreduce.map.java.opts=“-Xmx1356m” \
    
  /terasort-input /terasort-output