如何使用 celery worker 从 SQS 轮询消息,消息采用 JSON 格式,worker 无法解码该格式
how to poll message from SQS using celery worker, the message is in JSON format and worker not able to decode the format
如何使用 celery worker 从 SQS 轮询消息,消息采用 JSON 格式,而 worker 无法解码格式
注意:这些消息不会使用 celery beat 发送到 SQS,这个队列是从 SNS 订阅的
我的芹菜工人命令是:
芹菜工人-A status_handling -l 信息-Q es_status_test
Msg in Queue:
{
"Type" : "Notification",
"MessageId" : "f7e40fd9-8f92-59c5-afd9-5a1847aaae57",
"TopicArn" : "***",
"Message" : "{\"SESResponseStatusCode\": 200, \"Status\": \"Delivered\", \"Message\": \"Email sent successfully.\", \"MessageId\": \"a59e85a2-8b7a-4b49-9354-0a7a4170b0c0\", \"Uuid\": null}",
"Timestamp" : "2019-08-05T06:00:24.943Z",
"SignatureVersion" : "1",
"Signature" : "pass",
"SigningCertURL" : "pass",
"UnsubscribeURL" : "pass"
}
错误来了:
[2019-08-04 23:00:25,116: CRITICAL/MainProcess] Unrecoverable error: JSONDecodeError('Expecting value: line 1 column 1 (char 0)')
Traceback (most recent call last):
File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/worker.py", line 205, in start
self.blueprint.start(self)
File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 369, in start
return self.obj.start()
File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 596, in start
c.loop(*c.loop_args())
File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/loops.py", line 91, in asynloop
next(loop)
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
cb(*cbargs)
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 111, in on_readable
return self._on_event(fd, _pycurl.CSELECT_IN)
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 124, in _on_event
self._process_pending_requests()
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 130, in _process_pending_requests
self._process(curl)
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 178, in _process
buffer=buffer, effective_url=effective_url, error=error,
File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 177, in __call__
svpending(*ca, **ck)
File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
return self.throw()
File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
retval = fun(*final_args, **final_kwargs)
File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 100, in _transback
return callback(ret)
File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
return self.throw()
File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
retval = fun(*final_args, **final_kwargs)
File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 100, in _transback
return callback(ret)
File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
return self.throw()
File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
retval = fun(*final_args, **final_kwargs)
File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 98, in _transback
callback.throw()
File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 96, in _transback
ret = filter_(*args + (ret,), **kwargs)
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/transport/SQS.py", line 370, in _on_messages_ready
msg_parsed = self._message_to_python(msg, qname, queue)
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/transport/SQS.py", line 236, in _message_to_python
payload = loads(bytes_to_str(body))
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/utils/json.py", line 94, in loads
return stdjson.loads(s)
File "/usr/local/lib/python3.7/json/__init__.py", line 348, in loads
return _default_decoder.decode(s)
File "/usr/local/lib/python3.7/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/local/lib/python3.7/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Celery 中的 SQS 支持仅用作 Celery 特定消息的传输机制。您不能使用 Celery 来消费任意 SQS 消息。
相反,我建议编写自定义 Django 管理命令,在其中使用 boto3 库轮询 SQS 队列。
如何使用 celery worker 从 SQS 轮询消息,消息采用 JSON 格式,而 worker 无法解码格式
注意:这些消息不会使用 celery beat 发送到 SQS,这个队列是从 SNS 订阅的
我的芹菜工人命令是: 芹菜工人-A status_handling -l 信息-Q es_status_test
Msg in Queue:
{
"Type" : "Notification",
"MessageId" : "f7e40fd9-8f92-59c5-afd9-5a1847aaae57",
"TopicArn" : "***",
"Message" : "{\"SESResponseStatusCode\": 200, \"Status\": \"Delivered\", \"Message\": \"Email sent successfully.\", \"MessageId\": \"a59e85a2-8b7a-4b49-9354-0a7a4170b0c0\", \"Uuid\": null}",
"Timestamp" : "2019-08-05T06:00:24.943Z",
"SignatureVersion" : "1",
"Signature" : "pass",
"SigningCertURL" : "pass",
"UnsubscribeURL" : "pass"
}
错误来了:
[2019-08-04 23:00:25,116: CRITICAL/MainProcess] Unrecoverable error: JSONDecodeError('Expecting value: line 1 column 1 (char 0)')
Traceback (most recent call last):
File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/worker.py", line 205, in start
self.blueprint.start(self)
File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 369, in start
return self.obj.start()
File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 596, in start
c.loop(*c.loop_args())
File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/loops.py", line 91, in asynloop
next(loop)
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
cb(*cbargs)
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 111, in on_readable
return self._on_event(fd, _pycurl.CSELECT_IN)
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 124, in _on_event
self._process_pending_requests()
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 130, in _process_pending_requests
self._process(curl)
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 178, in _process
buffer=buffer, effective_url=effective_url, error=error,
File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 177, in __call__
svpending(*ca, **ck)
File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
return self.throw()
File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
retval = fun(*final_args, **final_kwargs)
File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 100, in _transback
return callback(ret)
File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
return self.throw()
File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
retval = fun(*final_args, **final_kwargs)
File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 100, in _transback
return callback(ret)
File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
return self.throw()
File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
retval = fun(*final_args, **final_kwargs)
File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 98, in _transback
callback.throw()
File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 96, in _transback
ret = filter_(*args + (ret,), **kwargs)
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/transport/SQS.py", line 370, in _on_messages_ready
msg_parsed = self._message_to_python(msg, qname, queue)
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/transport/SQS.py", line 236, in _message_to_python
payload = loads(bytes_to_str(body))
File "/home/vagrant/env/lib/python3.7/site-packages/kombu/utils/json.py", line 94, in loads
return stdjson.loads(s)
File "/usr/local/lib/python3.7/json/__init__.py", line 348, in loads
return _default_decoder.decode(s)
File "/usr/local/lib/python3.7/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/local/lib/python3.7/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Celery 中的 SQS 支持仅用作 Celery 特定消息的传输机制。您不能使用 Celery 来消费任意 SQS 消息。
相反,我建议编写自定义 Django 管理命令,在其中使用 boto3 库轮询 SQS 队列。