使用 rabbitmq 和 django 通道时出现 msgpack 错误

msgpack error when using rabbitmq and django channels

总的来说,我对 django 通道和消息队列有点陌生。

我的要求如下:

网页与 django 服务器建立 websocket 连接 django 服务器需要订阅 rabbitMQ 服务器上的频道(基于用户名) 当消息到达订阅的频道时,将其路由到适当的用户网络套接字,然后网页更新 UI 我有一个基本的 websocket 示例应用程序按照 http://channels.readthedocs.io/en/stable/

工作

现在我正在尝试处理来自 rabbitmq 通道的消息

我有以下路由:

routes = [
  route("websocket.receive", ws_message),
  route("websocket.connect", ws_accept),
  route("hello", hello_message),
]

和以下消费者:

import sys
import logging

logger = logging.getLogger('test')

def ws_message(message):
    logger.debug('---------- Got message on web socket --------------------')
    message.reply_channel.send({"text": message.content['text']})


def ws_accept(message):
    logger.debug('--------- Accepted Web Socket connection ----------------')
    message.reply_channel.send({"accept": True})


def hello_message():
    logger.debug('---------- Got message on MQ --------------------')

我编写了一个小的外部脚本来将消息发送到 "hello" 频道:

#!/usr/bin/env python

import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello', arguments={'x-expires': 120000, 'x-dead-letter-exchange': 'dead-letters'})

print 'Sending: ' + sys.argv[1];

channel.basic_publish(exchange='', routing_key='hello', body=sys.argv[1])
connection.close()

当我 运行 这个脚本并发送消息时,我在 django 运行 服务器输出上收到以下错误:

python2 manage.py runserver
Performing system checks...

System check identified some issues:

WARNINGS:
?: (1_8.W001) The standalone TEMPLATE_* settings were deprecated in Django 1.8 and the TEMPLATES dictionary takes precedence. You must put the values of the following settings into your default TEMPLATES dict: TEMPLATE_DIRS, TEMPLATE_LOADERS.

System check identified 1 issue (0 silenced).
August 24, 2017 - 10:06:04
Django version 1.11.4, using settings 'jarvice_channels.settings'
Starting Channels development server at http://127.0.0.1:8000/
Channel layer default (asgi_rabbitmq.core.RabbitmqChannelLayer)
Quit the server with CONTROL-C.
2017-08-24 10:06:04,788 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive
2017-08-24 10:06:04,789 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive
2017-08-24 10:06:04,790 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive
2017-08-24 10:06:04,790 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive
2017-08-24 10:06:04,792 - INFO - server - HTTP/2 support not enabled (install the http2 and tls Twisted extras)
2017-08-24 10:06:04,792 - INFO - server - Using busy-loop synchronous mode on channel layer
2017-08-24 10:06:04,792 - INFO - server - Listening on endpoint tcp:port=8000:interface=127.0.0.1
Exception in thread Thread-4:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/home/rep/rmqtest_env/lib/python2.7/site-packages/channels/management/commands/runserver.py", line 175, in run
    worker.run()
  File "/home/rep/rmqtest_env/lib/python2.7/site-packages/channels/worker.py", line 87, in run
    channel, content = self.channel_layer.receive_many(channels, block=True)
  File "/home/rep/rmqtest_env/lib/python2.7/site-packages/asgiref/base_layer.py", line 43, in receive_many
    return self.receive(channels, block)
  File "/home/rep/rmqtest_env/lib/python2.7/site-packages/asgi_rabbitmq/core.py", line 822, in receive
    return future.result()
  File "/home/rep/rmqtest_env/lib/python2.7/site-packages/concurrent/futures/_base.py", line 429, in result
    return self.__get_result()
  File "/home/rep/rmqtest_env/lib/python2.7/site-packages/concurrent/futures/_base.py", line 381, in __get_result
    raise exception_type, self._exception, self._traceback
ExtraData: unpack(b) received extra data.

所以消息正在通过,但反序列化不知何故失败了.... 给出了什么?

django 频道是否需要特定的消息格式?

问题是发送的消息需要使用msgpack打包(并在django通道中镜像消息的结构),像这样:

import msgpack
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.basic_publish(exchange='chat',
                  routing_key='external_or_whatever_you_desire',
                  body=msgpack.packb({'text': "Hello World!"})
                  )
channel.close()

当然,交易所应该与 RabbitMQ 上的交易所之一匹配(如果您使用聊天示例,上面的内容应该有效)。