如何上传文件到新的EMR集群

How to upload files to new EMR cluster

我想创建一个新的 EMR 集群,运行 一个 PySpark 作业并将其销毁。理想情况下,我想通过在创建集群时添加一个步骤来做到这一点。我将 运行 在本地启动作业的命令如下所示:

spark-submit calculate.py --input x.csv --output output

我不明白的是如何确保 calculate.py 在主节点上已经可用。我看到了从 S3 存储桶 here 读取 python 脚本的参考,但我无法让它工作。

现在我有单独的命令来创建集群、将脚本放在主节点上并添加步骤。这个问题是集群在作业步骤完成后保持 运行ning。

一种方法是在 bootstrap 操作中将您需要的文件复制到节点上。我们有一个小的 shell-脚本,s3-to-local.sh,位于 S3 上的一个桶中,它就是这样做的。该脚本很简单,看起来像这样:

#!/bin/bash

echo "Copying  to "
hadoop fs -copyToLocal  

然后在我的 create-cluster 命令中添加 bootstrap 操作:

aws --profile myProfile create-cluster \
--name "My cluster name" \
--auto-terminate \
--bootstrap-actions Path=s3://path/to/s3-to-local.sh,Name=copy-file,Args=[s3://path/to/file.ext,/local/filepath/file.ext] \
--steps ...

并且 file.ext 被复制到我的 EMR 集群的节点上。

至于你的集群没有终止,你可以像我在上面那样添加 auto-terminate 标志。当所有步骤完成时,这将导致您的集群终止。

请注意,还有其他方法可以做到这一点,但这是一种简单且非常直接的方法。它有效:)

我通过创建一个额外的步骤解决了这个问题,该步骤仅调用 hadoop fs -copyToLocal 来下载文件。

我在 bootstrap 步骤遇到问题,hadoop 命令尚未安装。

使用 boto3 的完整工作示例:

import boto3

client = boto3.client('emr', region_name='eu-central-1')

cluster_id = client.run_job_flow(
    Name='My job',
    LogUri='s3://my-bucket/emr-logs/',
    ReleaseLabel='emr-4.0.0',
    Instances={
        'MasterInstanceType': 'm3.xlarge',
        'SlaveInstanceType': 'm3.xlarge',
        'InstanceCount': 2,
        'Ec2KeyName': 'my_key',
        'Ec2SubnetId': 'subnet-123456'
    },
    Applications=[
        {
            'Name': 'Spark'
        }
    ],
    Steps=[
        {
            'Name': 'Copy files to master',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': [
                    'hadoop',
                    'fs',
                    '-copyToLocal',
                    '%s/code/*' % S3_BUCKET,
                    '/home/hadoop/'
                ]
            }
        },
        {
            'Name': 'Calculate step',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': [
                    'spark-submit',
                    '/home/hadoop/calculate.py',
                    '--param',
                    'value'
                ]
            }
        }
    ],
    VisibleToAllUsers=True,
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole')

print cluster_id