Django - Celery - SQS - S3 - 接收消息
Django - Celery - SQS - S3 - Receiving Messages
我有一个 Django 应用程序,我正在使用 Celery、SQS 和 S3。
当我 运行 使用 Django、Celery 和 SQS 的以下函数时,该函数工作并且它应该每分钟打印 'hello'。
from celery.task import periodic_task
from celery.schedules import crontab
@periodic_task(run_every=crontab(hour='*', minute='*', day_of_week="*"))
def print_hello():
print('hello world')
但该应用程序还链接到 S3 存储桶。每当将新文件保存到 S3 a notification is sent to the SQS queue。将通知消息发送到 SQS 队列时会发生此问题。当通知到达队列时,worker 失败。它停止周期性任务 print_hello(),给出此错误消息:
[2019-11-07 22:10:57,173: CRITICAL/MainProcess] Unrecoverable error:
Error('Incorrect padding')
...parserinvoker/lib64/python3.7/base64.py", line 87, in b64decode
return binascii.a2b_base64(s) binascii.Error: Incorrect padding
然后退出。我一直在查看文档,整个星期都在尝试进行故障排除,但没有找到解决方案。我包括我的 settings.py 以防这是配置问题
Settings.py
BROKER_URL = "sqs://"
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_DEFAULT_QUEUE = env('CELERY_DEFAULT_QUEUE')
CELERY_RESULT_BACKEND = None
BROKER_TRANSPORT_OPTIONS = {
'region': 'us-east-1',
'polling_interval':20,
'visibility_timeout': 3600,
'task_default_queue': env('CELERY_DEFAULT_QUEUE'),
}
celery 在队列中期望的 json 有效载荷的格式与 SQS 从 s3 接收到的不同;为了正确处理这些,您可能需要一个单独的定期任务来定期检查这些并耗尽 s3 通知队列,而不是将 s3 通知发送到 celery broker 队列。 s3 消息正文将显示为 described in the amazon documentation here。这是从 S3 发送到 SQS 的示例 2.1 记录:
"Records":[
{
"eventVersion":"2.1",
"eventSource":"aws:s3",
"awsRegion":"us-west-2",
"eventTime":The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, when Amazon S3 finished processing the request,
"eventName":"event-type",
"userIdentity":{
"principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
},
"requestParameters":{
"sourceIPAddress":"ip-address-where-request-came-from"
},
"responseElements":{
"x-amz-request-id":"Amazon S3 generated request ID",
"x-amz-id-2":"Amazon S3 host that processed the request"
},
"s3":{
"s3SchemaVersion":"1.0",
"configurationId":"ID found in the bucket notification configuration",
"bucket":{
"name":"bucket-name",
"ownerIdentity":{
"principalId":"Amazon-customer-ID-of-the-bucket-owner"
},
"arn":"bucket-ARN"
},
"object":{
"key":"object-key",
"size":object-size,
"eTag":"object eTag",
"versionId":"object version if bucket is versioning-enabled, otherwise null",
"sequencer": "a string representation of a hexadecimal value used to determine event sequence,
only used with PUTs and DELETEs"
}
},
"glacierEventData": {
"restoreEventData": {
"lifecycleRestorationExpiryTime": "The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, of Restore Expiry",
"lifecycleRestoreStorageClass": "Source storage class for restore"
}
}
}
]
}
celery 消息格式 looks like this。
我有一个 Django 应用程序,我正在使用 Celery、SQS 和 S3。 当我 运行 使用 Django、Celery 和 SQS 的以下函数时,该函数工作并且它应该每分钟打印 'hello'。
from celery.task import periodic_task
from celery.schedules import crontab
@periodic_task(run_every=crontab(hour='*', minute='*', day_of_week="*"))
def print_hello():
print('hello world')
但该应用程序还链接到 S3 存储桶。每当将新文件保存到 S3 a notification is sent to the SQS queue。将通知消息发送到 SQS 队列时会发生此问题。当通知到达队列时,worker 失败。它停止周期性任务 print_hello(),给出此错误消息:
[2019-11-07 22:10:57,173: CRITICAL/MainProcess] Unrecoverable error: Error('Incorrect padding') ...parserinvoker/lib64/python3.7/base64.py", line 87, in b64decode return binascii.a2b_base64(s) binascii.Error: Incorrect padding
然后退出。我一直在查看文档,整个星期都在尝试进行故障排除,但没有找到解决方案。我包括我的 settings.py 以防这是配置问题
Settings.py
BROKER_URL = "sqs://"
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_DEFAULT_QUEUE = env('CELERY_DEFAULT_QUEUE')
CELERY_RESULT_BACKEND = None
BROKER_TRANSPORT_OPTIONS = {
'region': 'us-east-1',
'polling_interval':20,
'visibility_timeout': 3600,
'task_default_queue': env('CELERY_DEFAULT_QUEUE'),
}
celery 在队列中期望的 json 有效载荷的格式与 SQS 从 s3 接收到的不同;为了正确处理这些,您可能需要一个单独的定期任务来定期检查这些并耗尽 s3 通知队列,而不是将 s3 通知发送到 celery broker 队列。 s3 消息正文将显示为 described in the amazon documentation here。这是从 S3 发送到 SQS 的示例 2.1 记录:
"Records":[
{
"eventVersion":"2.1",
"eventSource":"aws:s3",
"awsRegion":"us-west-2",
"eventTime":The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, when Amazon S3 finished processing the request,
"eventName":"event-type",
"userIdentity":{
"principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
},
"requestParameters":{
"sourceIPAddress":"ip-address-where-request-came-from"
},
"responseElements":{
"x-amz-request-id":"Amazon S3 generated request ID",
"x-amz-id-2":"Amazon S3 host that processed the request"
},
"s3":{
"s3SchemaVersion":"1.0",
"configurationId":"ID found in the bucket notification configuration",
"bucket":{
"name":"bucket-name",
"ownerIdentity":{
"principalId":"Amazon-customer-ID-of-the-bucket-owner"
},
"arn":"bucket-ARN"
},
"object":{
"key":"object-key",
"size":object-size,
"eTag":"object eTag",
"versionId":"object version if bucket is versioning-enabled, otherwise null",
"sequencer": "a string representation of a hexadecimal value used to determine event sequence,
only used with PUTs and DELETEs"
}
},
"glacierEventData": {
"restoreEventData": {
"lifecycleRestorationExpiryTime": "The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, of Restore Expiry",
"lifecycleRestoreStorageClass": "Source storage class for restore"
}
}
}
]
}
celery 消息格式 looks like this。