Amazon 批处理作业进程队列

Amazon batch job process queue

我正在使用 AWS EC2 实例进行生物信息学工作。我有一些(~1000)个大文件,应该在 EC2 实例上用脚本处理,结果应该上传回 S3 存储桶。我想将作业(文件)分发到多个 EC2 实例,优先以现货价格开始。

我需要的是一个简单易用的排队系统(可能是 AWS SQS 或其他),它可以将作业分配给实例并在实例失败(由于现货价格过高或其他原因)时重新启动作业。我研究过 AWS SQS 示例,但这些示例过于高级,通常涉及自动缩放和复杂的消息生成应用程序。

有人可以从概念上指出如何以最简单的方式最好地解决这个问题吗? AWS SQS 这种简单应用的任何示例?一堆实例应该如何启动,如何告诉then监听队列?

对于每个输入文件,我的工作流程基本上是这样的:

aws s3 cp s3://mybucket/file localFile ## Possibly streaming the file without copy
work.py --input localFile --output outputFile
aws s3 cp outputFile s3://mybucket/output/outputFile

您描述的是一种非常常见的面向批处理的设计模式:

  • 工作放在队列
  • 一个或多个 "worker" 个实例 从队列中拉取 工作
  • worker 实例的数量 根据队列的大小和工作的紧急程度 扩展
  • 使用现货定价最大限度地降低成本

完成此操作的最佳方法是:

  • 使用 Amazon Simple Queuing Service (SQS) 存储工作请求
  • 正在启动 Amazon EC2 实例,每个都重复:
    • 从队列
    • 中拉取一条消息
    • 处理 消息(例如通过上面列出的 download/process/upload 步骤)
    • 从队列中删除消息(表示工作已完成)
  • 利用Auto-Scaling控制实例数量,在积压多的情况下启动更多的实例,在有大量积压的情况下关闭所有实例没有工作
  • 对 Auto-Scaling 组使用 现货定价,一旦现货价格降低您的最高出价
  • ,实例将自动"come back to life"

与其使用排队系统 "distribute jobs to instances and restart jobs if an instance fails",SQS 将仅用于存储作业。 Auto-Scaling 将负责启动实例(包括重启是现货价格变化),实例本身将从队列中拉取工作。将其视为 "pull" 模型而不是 "push" 模型。

虽然整个系统可能看起来很复杂,但每个单独的组件都非常简单。我建议一步一步来:

  1. 有一个系统以某种方式将工作请求推送到 SQS 队列。这可能就像从 CLI 使用 aws sqs put-message 一样简单,或者在 Python using Boto(Python 的 AWS 开发工具包)中添加几行代码。

下面是一些示例代码(在命令行中使用消息调用它):

#!/usr/bin/python27

import boto, boto.sqs
from boto.sqs.message import Message
from optparse import OptionParser

# Parse command line
parser = OptionParser()
(options, args) = parser.parse_args()

# Send to SQS
q_conn = boto.sqs.connect_to_region('ap-southeast-2')

q = q_conn.get_queue('my-queue')
m = Message()
m.set_body(args[0])
print q.write(m)

print args[0] + ' pushed to Queue'
  1. 配置一个 Amazon EC2 实例,它可以自动启动您的 Python 应用程序或从 SQS 提取并处理您的工作的脚本。使用 User Data 字段在实例启动时触发工作。 运行 来自 shell 脚本的工作流程,或者您也可以将 S3 upload/download 代码编写为 Python 应用程序的一部分(包括不断提取新消息的循环) .

下面是一些从队列中检索消息的代码:

#!/usr/bin/python27

import boto, boto.sqs
from boto.sqs.message import Message

# Connect to Queue
q_conn = boto.sqs.connect_to_region('ap-southeast-2')
q = q_conn.get_queue('my-queue')

# Get a message
m = q.read(visibility_timeout=15)
if m == None:
  print "No message!"
else:
  print m.get_body()
  q.delete_message(m)
  1. 配置与您刚刚为 EC2 创建的配置匹配的自动缩放启动配置。这会告知 Auto-Scaling 如何启动实例(例如实例类型、用户数据)以及您愿意支付的现货价格。
  2. 创建一个 Auto Scaling 组 以自动启动实例。
  3. 配置扩展策略 如果您想要基于队列大小的 Auto Scaling 组 add/remove 个实例

另请参阅:

截至 2016 年 12 月,AWS 推出了一项名为 AWS Batch 的服务 这可能非常适合(甚至非常适合)问题中描述的工作量。请先查看 Batch,然后再选择其他建议之一。