Airflow/Luigi 用于 AWS EMR 自动集群创建和 pyspark 部署

Airflow/Luigi for AWS EMR automatic cluster creation and pyspark deployment

我是气流自动化的新手,我现在不知道是否可以使用 apache 气流(或 luigi 等)来做到这一点,或者我应该制作一个长 bash 文件来做到这一点。

我想为此构建 dag

  1. Create/clone AWS EMR 上的集群
  2. 安装python 要求
  3. 安装pyspark相关库
  4. 从 github
  5. 获取最新代码
  6. 提交 spark 作业
  7. 完成时终止集群

对于个别步骤,我可以制作如下所示的 .sh 文件(不确定这样做是否好)但不知道如何在气流中做到这一点

1) 使用 cluster.sh

创建一个聚类
 aws emr create-cluster \
    --name "1-node dummy cluster" \
    --instance-type m3.xlarge \
    --release-label emr-4.1.0 \
    --instance-count 1 \
    --use-default-roles \
    --applications Name=Spark \
    --auto-terminate

2 & 3 & 4) 克隆 git 并安装要求 codesetup.sh

git clone some-repo.git
pip install -r requirements.txt
mv xyz.jar /usr/lib/spark/xyz.jar

5) 运行 激发作业 sparkjob.sh

aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE

6) 不确定,可能是这个

  terminate-clusters
--cluster-ids <value> [<value>...]

终于可以作为一个.sh文件执行了。我需要通过 airflow/luigi.

了解解决此问题的好方法

我发现了什么:

我发现这个 post 很接近,但它已经过时(2016 年)并且缺少剧本的连接和代码

https://www.agari.com/email-security-blog/automated-model-building-emr-spark-airflow/

Airflow 有这方面的运营商。 airflow doc

我想通了,有两种选择可以做到这一点

1) 我们可以在 emr create-clusteraddstep 的帮助下制作一个 bash 脚本,然后使用 airflow Bashoperator 安排它

或者,这两个有包装器,称为 sparksteps

他们文档中的示例

sparksteps examples/episodes.py \
  --s3-bucket $AWS_S3_BUCKET \
  --aws-region us-east-1 \
  --release-label emr-4.7.0 \
  --uploads examples/lib examples/episodes.avro \
  --submit-args="--deploy-mode client --jars /home/hadoop/lib/spark-avro_2.10-2.0.2-custom.jar" \
  --app-args="--input /home/hadoop/episodes.avro" \
  --tags Application="Spark Steps" \
  --debug

您可以使用您选择的默认选项制作 .sh script。准备好这个脚本后,你可以从 airflow bashoperator 中调用它,如下所示

create_command = "sparkstep_custom.sh "    

t1 = BashOperator(
        task_id= 'create_file',
        bash_command=create_command,
        dag=dag
   )

2) 你可以使用 aws 的 airflow 自己的操作符来做到这一点。

EmrCreateJobFlowOperator(用于启动集群)EmrAddStepsOperator(用于提交 spark 作业) EmrStepSensor(跟踪步骤何时完成) EmrTerminateJobFlowOperator(在步骤完成时终止集群)

创建集群和提交步骤的基本示例

my_step=[

    {
        'Name': 'setup - copy files',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['aws', 's3', 'cp', S3_URI + 'test.py', '/home/hadoop/']
        }
    },
{
        'Name': 'setup - copy files 3',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['aws', 's3', 'cp', S3_URI + 'myfiledependecy.py', '/home/hadoop/']
        }
    },
 {
        'Name': 'Run Spark',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit','--jars', "jar1.jar,jar2.jar", '--py-files','/home/hadoop/myfiledependecy.py','/home/hadoop/test.py']
        }
    }
    ]


cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow2',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    dag=dag
)

step_adder_pre_step = EmrAddStepsOperator(
    task_id='pre_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=my_steps,
    dag=dag
)
step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('pre_step', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

此外,要将代码上传到 s3(我很想从 github_ 获取最新代码,可以使用 s3boto3Pythonoperator

简单的例子

S3_BUCKET = 'you_bucket_name'
S3_URI = 's3://{bucket}/'.format(bucket=S3_BUCKET)
def upload_file_to_S3(filename, key, bucket_name):
    s3.Bucket(bucket_name).upload_file(filename, key)

upload_to_S3_task = PythonOperator(
    task_id='upload_to_S3',
    python_callable=upload_file_to_S3,
    op_kwargs={
        'filename': configdata['project_path']+'test.py',
        'key': 'test.py',
        'bucket_name': 'dep-buck',
    },
    dag=dag)