带有 Amazon SQS 和 S3 事件的 Celery

Celery with Amazon SQS and S3 events

我想使用 Celery 来消费 Amazon 在 SQS 上交付的 S3 事件。但是,S3 message format does not match what Celery expects.

我怎样才能以最少的黑客行为使用这些消息?我应该编写自定义序列化程序吗?我应该放弃并使用 boto 或 boto3 制作自定义桥吗?

作为旁注,如果重要的话,我还想将 Celery 连接到另一个代理 (RabbitMQ) 以用于其余的应用程序消息传递。

您将需要创建一个侦听 S3 通知的服务,然后 运行s 适当的 celery 任务。

您有多种选择 - S3 通知通过 SQS、SNS 或 AWS Lambda 发出。

事实上,最简单的选择可能是根本不使用 Celery,而只是在 AWS Lambda 中向 运行 编写一些代码。我没有使用过这项服务(Lambda 相对较新),但看起来这意味着您不必使用,例如运行 监控服务或芹菜工人。

对于我的特定用例,事实证明最简单的方法是创建一个 bridge worker,它轮询 SQS 并使用默认代理将任务交给 Celery。

不难做到(尽管 boto 和 SQS 可以使用更多文档),而且 Celery 不太适合同时连接到两个不同的代理,所以感觉这是最好的方法。

配置 AWS S3 事件以调用 AWS Lambda 函数。编写函数将S3事件消息转换为Celery消息格式,然后将Celery消息发布到SQS。 Celery 会接收来自 SQS 的消息。

S3 事件 -> Lambda -> SQS -> Celery

Amazon S3 发送的用于发布事件的通知消息采用JSON 格式。 所以你可以配置你的芹菜序列化json。 下面是我的配置文件的摘录(使用 django)。

# AWS Credentials
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')

# Celery
BROKER_URL = "sqs://%s:%s@" % (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_DEFAULT_QUEUE = '<queue_name>'
CELERY_RESULT_BACKEND = None # Disabling the results backend

BROKER_TRANSPORT_OPTIONS = {
    'region': 'us-west-2',
    'polling_interval': 20,
}