Airflow/Luigi 用于 AWS EMR 自动集群创建和 pyspark 部署
Airflow/Luigi for AWS EMR automatic cluster creation and pyspark deployment
我是气流自动化的新手,我现在不知道是否可以使用 apache 气流(或 luigi 等)来做到这一点,或者我应该制作一个长 bash 文件来做到这一点。
我想为此构建 dag
- Create/clone AWS EMR 上的集群
- 安装python 要求
- 安装pyspark相关库
- 从 github
获取最新代码
- 提交 spark 作业
- 完成时终止集群
对于个别步骤,我可以制作如下所示的 .sh 文件(不确定这样做是否好)但不知道如何在气流中做到这一点
1) 使用 cluster.sh
创建一个聚类
aws emr create-cluster \
--name "1-node dummy cluster" \
--instance-type m3.xlarge \
--release-label emr-4.1.0 \
--instance-count 1 \
--use-default-roles \
--applications Name=Spark \
--auto-terminate
2 & 3 & 4) 克隆 git 并安装要求 codesetup.sh
git clone some-repo.git
pip install -r requirements.txt
mv xyz.jar /usr/lib/spark/xyz.jar
5) 运行 激发作业 sparkjob.sh
aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE
6) 不确定,可能是这个
terminate-clusters
--cluster-ids <value> [<value>...]
终于可以作为一个.sh文件执行了。我需要通过 airflow/luigi.
了解解决此问题的好方法
我发现了什么:
我发现这个 post 很接近,但它已经过时(2016 年)并且缺少剧本的连接和代码
https://www.agari.com/email-security-blog/automated-model-building-emr-spark-airflow/
Airflow 有这方面的运营商。 airflow doc
我想通了,有两种选择可以做到这一点
1) 我们可以在 emr create-cluster
和 addstep
的帮助下制作一个 bash 脚本,然后使用 airflow Bashoperator
安排它
或者,这两个有包装器,称为 sparksteps
他们文档中的示例
sparksteps examples/episodes.py \
--s3-bucket $AWS_S3_BUCKET \
--aws-region us-east-1 \
--release-label emr-4.7.0 \
--uploads examples/lib examples/episodes.avro \
--submit-args="--deploy-mode client --jars /home/hadoop/lib/spark-avro_2.10-2.0.2-custom.jar" \
--app-args="--input /home/hadoop/episodes.avro" \
--tags Application="Spark Steps" \
--debug
您可以使用您选择的默认选项制作 .sh script
。准备好这个脚本后,你可以从 airflow bashoperator 中调用它,如下所示
create_command = "sparkstep_custom.sh "
t1 = BashOperator(
task_id= 'create_file',
bash_command=create_command,
dag=dag
)
2) 你可以使用 aws 的 airflow 自己的操作符来做到这一点。
EmrCreateJobFlowOperator
(用于启动集群)EmrAddStepsOperator
(用于提交 spark 作业)
EmrStepSensor
(跟踪步骤何时完成)
EmrTerminateJobFlowOperator
(在步骤完成时终止集群)
创建集群和提交步骤的基本示例
my_step=[
{
'Name': 'setup - copy files',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['aws', 's3', 'cp', S3_URI + 'test.py', '/home/hadoop/']
}
},
{
'Name': 'setup - copy files 3',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['aws', 's3', 'cp', S3_URI + 'myfiledependecy.py', '/home/hadoop/']
}
},
{
'Name': 'Run Spark',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit','--jars', "jar1.jar,jar2.jar", '--py-files','/home/hadoop/myfiledependecy.py','/home/hadoop/test.py']
}
}
]
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow2',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)
step_adder_pre_step = EmrAddStepsOperator(
task_id='pre_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
aws_conn_id='aws_default',
steps=my_steps,
dag=dag
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('pre_step', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
此外,要将代码上传到 s3(我很想从 github_ 获取最新代码,可以使用 s3
、boto3
和Pythonoperator
简单的例子
S3_BUCKET = 'you_bucket_name'
S3_URI = 's3://{bucket}/'.format(bucket=S3_BUCKET)
def upload_file_to_S3(filename, key, bucket_name):
s3.Bucket(bucket_name).upload_file(filename, key)
upload_to_S3_task = PythonOperator(
task_id='upload_to_S3',
python_callable=upload_file_to_S3,
op_kwargs={
'filename': configdata['project_path']+'test.py',
'key': 'test.py',
'bucket_name': 'dep-buck',
},
dag=dag)
我是气流自动化的新手,我现在不知道是否可以使用 apache 气流(或 luigi 等)来做到这一点,或者我应该制作一个长 bash 文件来做到这一点。
我想为此构建 dag
- Create/clone AWS EMR 上的集群
- 安装python 要求
- 安装pyspark相关库
- 从 github 获取最新代码
- 提交 spark 作业
- 完成时终止集群
对于个别步骤,我可以制作如下所示的 .sh 文件(不确定这样做是否好)但不知道如何在气流中做到这一点
1) 使用 cluster.sh
aws emr create-cluster \
--name "1-node dummy cluster" \
--instance-type m3.xlarge \
--release-label emr-4.1.0 \
--instance-count 1 \
--use-default-roles \
--applications Name=Spark \
--auto-terminate
2 & 3 & 4) 克隆 git 并安装要求 codesetup.sh
git clone some-repo.git
pip install -r requirements.txt
mv xyz.jar /usr/lib/spark/xyz.jar
5) 运行 激发作业 sparkjob.sh
aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE
6) 不确定,可能是这个
terminate-clusters
--cluster-ids <value> [<value>...]
终于可以作为一个.sh文件执行了。我需要通过 airflow/luigi.
了解解决此问题的好方法我发现了什么:
我发现这个 post 很接近,但它已经过时(2016 年)并且缺少剧本的连接和代码
https://www.agari.com/email-security-blog/automated-model-building-emr-spark-airflow/
Airflow 有这方面的运营商。 airflow doc
我想通了,有两种选择可以做到这一点
1) 我们可以在 emr create-cluster
和 addstep
的帮助下制作一个 bash 脚本,然后使用 airflow Bashoperator
安排它
或者,这两个有包装器,称为 sparksteps
他们文档中的示例
sparksteps examples/episodes.py \
--s3-bucket $AWS_S3_BUCKET \
--aws-region us-east-1 \
--release-label emr-4.7.0 \
--uploads examples/lib examples/episodes.avro \
--submit-args="--deploy-mode client --jars /home/hadoop/lib/spark-avro_2.10-2.0.2-custom.jar" \
--app-args="--input /home/hadoop/episodes.avro" \
--tags Application="Spark Steps" \
--debug
您可以使用您选择的默认选项制作 .sh script
。准备好这个脚本后,你可以从 airflow bashoperator 中调用它,如下所示
create_command = "sparkstep_custom.sh "
t1 = BashOperator(
task_id= 'create_file',
bash_command=create_command,
dag=dag
)
2) 你可以使用 aws 的 airflow 自己的操作符来做到这一点。
EmrCreateJobFlowOperator
(用于启动集群)EmrAddStepsOperator
(用于提交 spark 作业)
EmrStepSensor
(跟踪步骤何时完成)
EmrTerminateJobFlowOperator
(在步骤完成时终止集群)
创建集群和提交步骤的基本示例
my_step=[
{
'Name': 'setup - copy files',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['aws', 's3', 'cp', S3_URI + 'test.py', '/home/hadoop/']
}
},
{
'Name': 'setup - copy files 3',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['aws', 's3', 'cp', S3_URI + 'myfiledependecy.py', '/home/hadoop/']
}
},
{
'Name': 'Run Spark',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit','--jars', "jar1.jar,jar2.jar", '--py-files','/home/hadoop/myfiledependecy.py','/home/hadoop/test.py']
}
}
]
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow2',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)
step_adder_pre_step = EmrAddStepsOperator(
task_id='pre_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
aws_conn_id='aws_default',
steps=my_steps,
dag=dag
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('pre_step', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
此外,要将代码上传到 s3(我很想从 github_ 获取最新代码,可以使用 s3
、boto3
和Pythonoperator
简单的例子
S3_BUCKET = 'you_bucket_name'
S3_URI = 's3://{bucket}/'.format(bucket=S3_BUCKET)
def upload_file_to_S3(filename, key, bucket_name):
s3.Bucket(bucket_name).upload_file(filename, key)
upload_to_S3_task = PythonOperator(
task_id='upload_to_S3',
python_callable=upload_file_to_S3,
op_kwargs={
'filename': configdata['project_path']+'test.py',
'key': 'test.py',
'bucket_name': 'dep-buck',
},
dag=dag)