Celery SQS 消费者的填充不正确
Incorrect padding on a Celery SQS consumer
我正在尝试为 SQS 队列设置 Celery 消费者。
我将 Celery 4.1.0 与 Python3 一起使用,我通过这种方式向库 boto3 (1.5.28) 发送了一条消息:
response = queue.send_message(MessageBody='Hello World')
到目前为止一切正常。
然后我尝试以这种方式设置 Celery 消费者:
@app.task(base=celery.Task, name='test', bind=True)
def test(self, message):
print(message)
return True
但我得到了这个回溯:
...
File ".../lib/python3.6/site-packages/kombu/transport/SQS.py", line 350, in _on_messages_ready
msg_parsed = self._message_to_python(msg, qname, queue)
File ".../lib/python3.6/site-packages/kombu/transport/SQS.py", line 215, in _message_to_python
body = base64.b64decode(message['Body'].encode())
File ".../lib/python3.6/base64.py", line 87, in b64decode
return binascii.a2b_base64(s)
binascii.Error: Incorrect padding
我做的有什么问题吗?我需要一些特定的配置才能使用 SQS 队列吗?
提前致谢!
您的消息需要经过 base64 编码JSON:
message = {"test":"test"}
message_string = json.dumps(message)
byte_message = base64.b64encode(message_string.encode('utf-8'))
base64_json_string = byte_message.decode()
response = queue.send_message(MessageBody=base64_json_string)
我正在尝试为 SQS 队列设置 Celery 消费者。
我将 Celery 4.1.0 与 Python3 一起使用,我通过这种方式向库 boto3 (1.5.28) 发送了一条消息:
response = queue.send_message(MessageBody='Hello World')
到目前为止一切正常。
然后我尝试以这种方式设置 Celery 消费者:
@app.task(base=celery.Task, name='test', bind=True)
def test(self, message):
print(message)
return True
但我得到了这个回溯:
...
File ".../lib/python3.6/site-packages/kombu/transport/SQS.py", line 350, in _on_messages_ready
msg_parsed = self._message_to_python(msg, qname, queue)
File ".../lib/python3.6/site-packages/kombu/transport/SQS.py", line 215, in _message_to_python
body = base64.b64decode(message['Body'].encode())
File ".../lib/python3.6/base64.py", line 87, in b64decode
return binascii.a2b_base64(s)
binascii.Error: Incorrect padding
我做的有什么问题吗?我需要一些特定的配置才能使用 SQS 队列吗?
提前致谢!
您的消息需要经过 base64 编码JSON:
message = {"test":"test"}
message_string = json.dumps(message)
byte_message = base64.b64encode(message_string.encode('utf-8'))
base64_json_string = byte_message.decode()
response = queue.send_message(MessageBody=base64_json_string)