使用 rabbitmq 和 django 通道时出现 msgpack 错误
msgpack error when using rabbitmq and django channels
总的来说,我对 django 通道和消息队列有点陌生。
我的要求如下:
网页与 django 服务器建立 websocket 连接
django 服务器需要订阅 rabbitMQ 服务器上的频道(基于用户名)
当消息到达订阅的频道时,将其路由到适当的用户网络套接字,然后网页更新 UI
我有一个基本的 websocket 示例应用程序按照 http://channels.readthedocs.io/en/stable/
工作
现在我正在尝试处理来自 rabbitmq 通道的消息
我有以下路由:
routes = [
route("websocket.receive", ws_message),
route("websocket.connect", ws_accept),
route("hello", hello_message),
]
和以下消费者:
import sys
import logging
logger = logging.getLogger('test')
def ws_message(message):
logger.debug('---------- Got message on web socket --------------------')
message.reply_channel.send({"text": message.content['text']})
def ws_accept(message):
logger.debug('--------- Accepted Web Socket connection ----------------')
message.reply_channel.send({"accept": True})
def hello_message():
logger.debug('---------- Got message on MQ --------------------')
我编写了一个小的外部脚本来将消息发送到 "hello" 频道:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello', arguments={'x-expires': 120000, 'x-dead-letter-exchange': 'dead-letters'})
print 'Sending: ' + sys.argv[1];
channel.basic_publish(exchange='', routing_key='hello', body=sys.argv[1])
connection.close()
当我 运行 这个脚本并发送消息时,我在 django 运行 服务器输出上收到以下错误:
python2 manage.py runserver
Performing system checks...
System check identified some issues:
WARNINGS:
?: (1_8.W001) The standalone TEMPLATE_* settings were deprecated in Django 1.8 and the TEMPLATES dictionary takes precedence. You must put the values of the following settings into your default TEMPLATES dict: TEMPLATE_DIRS, TEMPLATE_LOADERS.
System check identified 1 issue (0 silenced).
August 24, 2017 - 10:06:04
Django version 1.11.4, using settings 'jarvice_channels.settings'
Starting Channels development server at http://127.0.0.1:8000/
Channel layer default (asgi_rabbitmq.core.RabbitmqChannelLayer)
Quit the server with CONTROL-C.
2017-08-24 10:06:04,788 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive
2017-08-24 10:06:04,789 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive
2017-08-24 10:06:04,790 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive
2017-08-24 10:06:04,790 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive
2017-08-24 10:06:04,792 - INFO - server - HTTP/2 support not enabled (install the http2 and tls Twisted extras)
2017-08-24 10:06:04,792 - INFO - server - Using busy-loop synchronous mode on channel layer
2017-08-24 10:06:04,792 - INFO - server - Listening on endpoint tcp:port=8000:interface=127.0.0.1
Exception in thread Thread-4:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/home/rep/rmqtest_env/lib/python2.7/site-packages/channels/management/commands/runserver.py", line 175, in run
worker.run()
File "/home/rep/rmqtest_env/lib/python2.7/site-packages/channels/worker.py", line 87, in run
channel, content = self.channel_layer.receive_many(channels, block=True)
File "/home/rep/rmqtest_env/lib/python2.7/site-packages/asgiref/base_layer.py", line 43, in receive_many
return self.receive(channels, block)
File "/home/rep/rmqtest_env/lib/python2.7/site-packages/asgi_rabbitmq/core.py", line 822, in receive
return future.result()
File "/home/rep/rmqtest_env/lib/python2.7/site-packages/concurrent/futures/_base.py", line 429, in result
return self.__get_result()
File "/home/rep/rmqtest_env/lib/python2.7/site-packages/concurrent/futures/_base.py", line 381, in __get_result
raise exception_type, self._exception, self._traceback
ExtraData: unpack(b) received extra data.
所以消息正在通过,但反序列化不知何故失败了....
给出了什么?
django 频道是否需要特定的消息格式?
问题是发送的消息需要使用msgpack
打包(并在django通道中镜像消息的结构),像这样:
import msgpack
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_publish(exchange='chat',
routing_key='external_or_whatever_you_desire',
body=msgpack.packb({'text': "Hello World!"})
)
channel.close()
当然,交易所应该与 RabbitMQ 上的交易所之一匹配(如果您使用聊天示例,上面的内容应该有效)。
总的来说,我对 django 通道和消息队列有点陌生。
我的要求如下:
网页与 django 服务器建立 websocket 连接 django 服务器需要订阅 rabbitMQ 服务器上的频道(基于用户名) 当消息到达订阅的频道时,将其路由到适当的用户网络套接字,然后网页更新 UI 我有一个基本的 websocket 示例应用程序按照 http://channels.readthedocs.io/en/stable/
工作现在我正在尝试处理来自 rabbitmq 通道的消息
我有以下路由:
routes = [
route("websocket.receive", ws_message),
route("websocket.connect", ws_accept),
route("hello", hello_message),
]
和以下消费者:
import sys
import logging
logger = logging.getLogger('test')
def ws_message(message):
logger.debug('---------- Got message on web socket --------------------')
message.reply_channel.send({"text": message.content['text']})
def ws_accept(message):
logger.debug('--------- Accepted Web Socket connection ----------------')
message.reply_channel.send({"accept": True})
def hello_message():
logger.debug('---------- Got message on MQ --------------------')
我编写了一个小的外部脚本来将消息发送到 "hello" 频道:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello', arguments={'x-expires': 120000, 'x-dead-letter-exchange': 'dead-letters'})
print 'Sending: ' + sys.argv[1];
channel.basic_publish(exchange='', routing_key='hello', body=sys.argv[1])
connection.close()
当我 运行 这个脚本并发送消息时,我在 django 运行 服务器输出上收到以下错误:
python2 manage.py runserver
Performing system checks...
System check identified some issues:
WARNINGS:
?: (1_8.W001) The standalone TEMPLATE_* settings were deprecated in Django 1.8 and the TEMPLATES dictionary takes precedence. You must put the values of the following settings into your default TEMPLATES dict: TEMPLATE_DIRS, TEMPLATE_LOADERS.
System check identified 1 issue (0 silenced).
August 24, 2017 - 10:06:04
Django version 1.11.4, using settings 'jarvice_channels.settings'
Starting Channels development server at http://127.0.0.1:8000/
Channel layer default (asgi_rabbitmq.core.RabbitmqChannelLayer)
Quit the server with CONTROL-C.
2017-08-24 10:06:04,788 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive
2017-08-24 10:06:04,789 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive
2017-08-24 10:06:04,790 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive
2017-08-24 10:06:04,790 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive
2017-08-24 10:06:04,792 - INFO - server - HTTP/2 support not enabled (install the http2 and tls Twisted extras)
2017-08-24 10:06:04,792 - INFO - server - Using busy-loop synchronous mode on channel layer
2017-08-24 10:06:04,792 - INFO - server - Listening on endpoint tcp:port=8000:interface=127.0.0.1
Exception in thread Thread-4:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/home/rep/rmqtest_env/lib/python2.7/site-packages/channels/management/commands/runserver.py", line 175, in run
worker.run()
File "/home/rep/rmqtest_env/lib/python2.7/site-packages/channels/worker.py", line 87, in run
channel, content = self.channel_layer.receive_many(channels, block=True)
File "/home/rep/rmqtest_env/lib/python2.7/site-packages/asgiref/base_layer.py", line 43, in receive_many
return self.receive(channels, block)
File "/home/rep/rmqtest_env/lib/python2.7/site-packages/asgi_rabbitmq/core.py", line 822, in receive
return future.result()
File "/home/rep/rmqtest_env/lib/python2.7/site-packages/concurrent/futures/_base.py", line 429, in result
return self.__get_result()
File "/home/rep/rmqtest_env/lib/python2.7/site-packages/concurrent/futures/_base.py", line 381, in __get_result
raise exception_type, self._exception, self._traceback
ExtraData: unpack(b) received extra data.
所以消息正在通过,但反序列化不知何故失败了.... 给出了什么?
django 频道是否需要特定的消息格式?
问题是发送的消息需要使用msgpack
打包(并在django通道中镜像消息的结构),像这样:
import msgpack
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_publish(exchange='chat',
routing_key='external_or_whatever_you_desire',
body=msgpack.packb({'text': "Hello World!"})
)
channel.close()
当然,交易所应该与 RabbitMQ 上的交易所之一匹配(如果您使用聊天示例,上面的内容应该有效)。