如何使用 celery worker 从 SQS 轮询消息,消息采用 JSON 格式,worker 无法解码该格式

how to poll message from SQS using celery worker, the message is in JSON format and worker not able to decode the format

如何使用 celery worker 从 SQS 轮询消息,消息采用 JSON 格式,而 worker 无法解码格式

注意:这些消息不会使用 celery beat 发送到 SQS,这个队列是从 SNS 订阅的

我的芹菜工人命令是: 芹菜工人-A status_handling -l 信息-Q es_status_test

Msg in Queue:

{
  "Type" : "Notification",
  "MessageId" : "f7e40fd9-8f92-59c5-afd9-5a1847aaae57",
  "TopicArn" : "***",
  "Message" : "{\"SESResponseStatusCode\": 200, \"Status\": \"Delivered\", \"Message\": \"Email sent successfully.\", \"MessageId\": \"a59e85a2-8b7a-4b49-9354-0a7a4170b0c0\", \"Uuid\": null}",
  "Timestamp" : "2019-08-05T06:00:24.943Z",
  "SignatureVersion" : "1",
  "Signature" : "pass",
  "SigningCertURL" : "pass",
  "UnsubscribeURL" : "pass"
}

错误来了:

[2019-08-04 23:00:25,116: CRITICAL/MainProcess] Unrecoverable error: JSONDecodeError('Expecting value: line 1 column 1 (char 0)')
Traceback (most recent call last):
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 596, in start
    c.loop(*c.loop_args())
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
    cb(*cbargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 111, in on_readable
    return self._on_event(fd, _pycurl.CSELECT_IN)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 124, in _on_event
    self._process_pending_requests()
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 130, in _process_pending_requests
    self._process(curl)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 178, in _process
    buffer=buffer, effective_url=effective_url, error=error,
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 177, in __call__
    svpending(*ca, **ck)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 100, in _transback
    return callback(ret)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 100, in _transback
    return callback(ret)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 98, in _transback
    callback.throw()
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 96, in _transback
    ret = filter_(*args + (ret,), **kwargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/transport/SQS.py", line 370, in _on_messages_ready
    msg_parsed = self._message_to_python(msg, qname, queue)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/transport/SQS.py", line 236, in _message_to_python
    payload = loads(bytes_to_str(body))
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/utils/json.py", line 94, in loads
    return stdjson.loads(s)
  File "/usr/local/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.7/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

Celery 中的 SQS 支持仅用作 Celery 特定消息的传输机制。您不能使用 Celery 来消费任意 SQS 消息。

相反,我建议编写自定义 Django 管理命令,在其中使用 boto3 库轮询 SQS 队列。