您如何使用 boto3(或其他方式)在 emr 上自动化 pyspark 作业?
How do you automate pyspark jobs on emr using boto3 (or otherwise)?
我正在创建一个作业来解析大量服务器数据,然后将其上传到 Redshift
数据库中。
我的工作流程如下:
- 从S3抓取日志数据
- 使用 spark
dataframes
或 spark sql 解析数据并写回 S3
- 将数据从 S3 上传到 Redshift。
我对如何自动执行此操作感到困惑,以便我的进程启动 EMR 集群,引导正确的程序进行安装,并运行我的 python 脚本,该脚本将包含用于解析的代码和写作。
是否有人可以与我分享任何示例、教程或经验来帮助我学习如何执行此操作?
查看 boto3 EMR docs to create the cluster. You essentially have to call run_job_flow 并创建运行所需程序的步骤。
import boto3
client = boto3.client('emr', region_name='us-east-1')
S3_BUCKET = 'MyS3Bucket'
S3_KEY = 'spark/main.py'
S3_URI = 's3://{bucket}/{key}'.format(bucket=S3_BUCKET, key=S3_KEY)
# upload file to an S3 bucket
s3 = boto3.resource('s3')
s3.meta.client.upload_file("myfile.py", S3_BUCKET, S3_KEY)
response = client.run_job_flow(
Name="My Spark Cluster",
ReleaseLabel='emr-4.6.0',
Instances={
'MasterInstanceType': 'm4.xlarge',
'SlaveInstanceType': 'm4.xlarge',
'InstanceCount': 4,
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
},
Applications=[
{
'Name': 'Spark'
}
],
BootstrapActions=[
{
'Name': 'Maximize Spark Default Config',
'ScriptBootstrapAction': {
'Path': 's3://support.elasticmapreduce/spark/maximize-spark-default-config',
}
},
],
Steps=[
{
'Name': 'Setup Debugging',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['state-pusher-script']
}
},
{
'Name': 'setup - copy files',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['aws', 's3', 'cp', S3_URI, '/home/hadoop/']
}
},
{
'Name': 'Run Spark',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit', '/home/hadoop/main.py']
}
}
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole'
)
如果您知道作业流 ID,您还可以将步骤添加到 运行 集群:
job_flow_id = response['JobFlowId']
print("Job flow ID:", job_flow_id)
step_response = client.add_job_flow_steps(JobFlowId=job_flow_id, Steps=SomeMoreSteps)
step_ids = step_response['StepIds']
print("Step IDs:", step_ids)
有关更多配置,请查看 sparksteps。
只需使用 AWS Data Pipeline. You can setup your S3 bucket to trigger a lambda function every time a new file is placed inside the bucket https://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html. Then your Lambda function will activate your Data Pipeline https://aws.amazon.com/blogs/big-data/using-aws-lambda-for-event-driven-data-processing-pipelines/ 执行此操作,然后您的数据管道会使用 EmrCluster 启动一个新的 EMR 集群,然后您可以指定 bootstrap 选项,然后您可以 运行 您的 EMR 命令使用 EmrActivity,完成后它将终止您的 EMR 集群并停用数据管道。
其实,
我已经使用了 AWS 的 Step Functions,它是 Lambda 函数的状态机包装器,因此您可以使用 boto3
来使用 run_job_flow and you can use describe_cluaster to get the status of the cluster. Finally use a choice 启动 EMR Spark 作业。所以你的步骤函数看起来像这样(括号中的步骤函数类型:
运行 job(任务)-> Wait for X min(等待)-> Check status(任务)-> Branch(选择)[
=> 返回等待,或者
=> 完成]
我在 GitHub 上放了一个完整的示例,展示了如何使用 Boto3 完成所有这些操作。
long-lived 集群示例展示了如何在集群上创建和 运行 作业步骤,该集群从包含历史亚马逊评论数据的 public S3 存储桶中获取数据,执行一些 PySpark对其进行处理,并将输出写回 S3 存储桶。
- 创建 Amazon S3 存储桶并上传作业脚本。
- 创建演示使用的 AWS Identity and Access Management (IAM) 角色。
- 创建演示使用的 Amazon Elastic Compute Cloud (Amazon EC2) 安全组。
- 创建 short-lived 和 long-lived 集群并在其上执行 运行 作业步骤。
- 终止集群并清理所有资源。
我正在创建一个作业来解析大量服务器数据,然后将其上传到 Redshift
数据库中。
我的工作流程如下:
- 从S3抓取日志数据
- 使用 spark
dataframes
或 spark sql 解析数据并写回 S3 - 将数据从 S3 上传到 Redshift。
我对如何自动执行此操作感到困惑,以便我的进程启动 EMR 集群,引导正确的程序进行安装,并运行我的 python 脚本,该脚本将包含用于解析的代码和写作。
是否有人可以与我分享任何示例、教程或经验来帮助我学习如何执行此操作?
查看 boto3 EMR docs to create the cluster. You essentially have to call run_job_flow 并创建运行所需程序的步骤。
import boto3
client = boto3.client('emr', region_name='us-east-1')
S3_BUCKET = 'MyS3Bucket'
S3_KEY = 'spark/main.py'
S3_URI = 's3://{bucket}/{key}'.format(bucket=S3_BUCKET, key=S3_KEY)
# upload file to an S3 bucket
s3 = boto3.resource('s3')
s3.meta.client.upload_file("myfile.py", S3_BUCKET, S3_KEY)
response = client.run_job_flow(
Name="My Spark Cluster",
ReleaseLabel='emr-4.6.0',
Instances={
'MasterInstanceType': 'm4.xlarge',
'SlaveInstanceType': 'm4.xlarge',
'InstanceCount': 4,
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
},
Applications=[
{
'Name': 'Spark'
}
],
BootstrapActions=[
{
'Name': 'Maximize Spark Default Config',
'ScriptBootstrapAction': {
'Path': 's3://support.elasticmapreduce/spark/maximize-spark-default-config',
}
},
],
Steps=[
{
'Name': 'Setup Debugging',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['state-pusher-script']
}
},
{
'Name': 'setup - copy files',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['aws', 's3', 'cp', S3_URI, '/home/hadoop/']
}
},
{
'Name': 'Run Spark',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['spark-submit', '/home/hadoop/main.py']
}
}
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole'
)
如果您知道作业流 ID,您还可以将步骤添加到 运行 集群:
job_flow_id = response['JobFlowId']
print("Job flow ID:", job_flow_id)
step_response = client.add_job_flow_steps(JobFlowId=job_flow_id, Steps=SomeMoreSteps)
step_ids = step_response['StepIds']
print("Step IDs:", step_ids)
有关更多配置,请查看 sparksteps。
只需使用 AWS Data Pipeline. You can setup your S3 bucket to trigger a lambda function every time a new file is placed inside the bucket https://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html. Then your Lambda function will activate your Data Pipeline https://aws.amazon.com/blogs/big-data/using-aws-lambda-for-event-driven-data-processing-pipelines/ 执行此操作,然后您的数据管道会使用 EmrCluster 启动一个新的 EMR 集群,然后您可以指定 bootstrap 选项,然后您可以 运行 您的 EMR 命令使用 EmrActivity,完成后它将终止您的 EMR 集群并停用数据管道。
其实,
我已经使用了 AWS 的 Step Functions,它是 Lambda 函数的状态机包装器,因此您可以使用 boto3
来使用 run_job_flow and you can use describe_cluaster to get the status of the cluster. Finally use a choice 启动 EMR Spark 作业。所以你的步骤函数看起来像这样(括号中的步骤函数类型:
运行 job(任务)-> Wait for X min(等待)-> Check status(任务)-> Branch(选择)[ => 返回等待,或者 => 完成]
我在 GitHub 上放了一个完整的示例,展示了如何使用 Boto3 完成所有这些操作。
long-lived 集群示例展示了如何在集群上创建和 运行 作业步骤,该集群从包含历史亚马逊评论数据的 public S3 存储桶中获取数据,执行一些 PySpark对其进行处理,并将输出写回 S3 存储桶。
- 创建 Amazon S3 存储桶并上传作业脚本。
- 创建演示使用的 AWS Identity and Access Management (IAM) 角色。
- 创建演示使用的 Amazon Elastic Compute Cloud (Amazon EC2) 安全组。
- 创建 short-lived 和 long-lived 集群并在其上执行 运行 作业步骤。
- 终止集群并清理所有资源。