如何从 django-celery 3 任务发送频道 2.x 组消息?
How do I send channels 2.x group message from django-celery 3 task?
我需要推迟发送频道消息。这是我的代码:
# consumers.py
class ChatConsumer(WebsocketConsumer):
def chat_message(self, event):
self.send(text_data=json.dumps(event['message']))
def connect(self):
self.channel_layer.group_add(self.room_name, self.channel_name)
self.accept()
def receive(self, text_data=None, bytes_data=None):
send_message_task.apply_async(
args=(
self.room_name,
{'type': 'chat_message',
'message': 'the message'}
),
countdown=10
)
# tasks.py
@shared_task
def send_message_task(room_name, message):
layer = get_channel_layer()
layer.group_send(room_name, message)
任务正在执行中,我看不到任何错误,但消息未发送。只有当我从消费者 class 方法发送它时它才有效。
我还尝试使用 AsyncWebsocketConsumer 并使用 AsyncToSync(layer.group_send) 发送。它错误 "You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly."
然后我尝试将 send_message_task 声明为异步并使用 await。什么也没有发生(没有错误),我不确定任务是否被执行了。
以下是版本:
Django==1.11.13
redis==2.10.5
django-celery==3.2.2
channels==2.1.2
channels_redis==2.2.1
设置:
REDIS_HOST = os.getenv('REDIS_HOST', '127.0.0.1')
BROKER_URL = 'redis://{}:6379/0'.format(REDIS_HOST)
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": ['redis://{}:6379/1'.format(REDIS_HOST)],
},
},
}
有什么想法吗?
UPD: 刚刚发现 redis 通道层被检索但是它的 group_send 方法没有被调用,只是被跳过了。
更新 2: 使用 AsyncToSync(layer.group_send)
从控制台发送。没有 apply_async
的调用任务也可以。但是 运行 它与 apply_async
会导致错误 You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly
。将任务定义为异步并使用 await
当然也会破坏所有内容。
您的代码中的问题是您在类型 chat_message
中使用了 underscore
。我相信您在文档中错过了它:
The name of the method will be the type of the event with periods
replaced by underscores - so, for example, an event coming in over the
channel layer with a type of chat.join
will be handled by the method
chat_join
.
所以在你的情况下,类型将是 chat.message
{
'type': 'chat.message',
'message': 'the message'
}
使用通道层时,连接时需要 async_to_sync()
包装器,因为所有通道层方法都是异步的。
def connect(self):
async_to_sync(self.channel_layer.group_add(
self.room_name, self.channel_name)
self.accept()
从您的 celery 任务发送消息也是如此。
@shared_task
def send_message_task(room_name, message):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
room_name,
{'type': 'chat_message', 'message': message}
)
您也可以像这样从您的消费者 receive()
调用您的 celery 任务:
send_message_task.delay(self.room_name, 'your message here')
关于 AsyncToSync 错误,您需要将频道和 daphne 升级到更新版本 explained in this thread。
我发现了一个丑陋且低效的决定,但它有效:
@shared_task
def send_message_task(room_name, message):
def sender(room_name, message):
channel_layer = get_channel_layer()
AsyncToSync(channel_layer.group_send)(
room_name,
{'type': 'chat_message', 'message': message}
)
thread = threading.Thread(target=sender, args=(room_name, message,))
thread.start()
thread.join()
如果有人可以改进它,我将不胜感激。
也许这不是对起始问题的直接回答,但这可能会有所帮助。
如果你得到异常 "You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly" 那么你可能会做一些这样的事情:
- 事件循环在某处创建
- 一些 ASYNC 代码已启动
- 从 ASYNC 代码调用一些 SYNC 代码
- SYNC 代码正在尝试使用 AsyncToSync 调用 ASYNC 代码,这会阻止这种情况
似乎 AsyncToSync 检测到外部事件循环并决定不干扰它。
解决方案是直接将异步调用包含在外部事件循环中。
示例代码如下,但最好检查您的情况,外循环是 运行 ...
loop = asyncio.get_event_loop()
loop.create_task(layer.group_send(room_name, {'type': 'chat_message', 'message': message}))
我需要推迟发送频道消息。这是我的代码:
# consumers.py
class ChatConsumer(WebsocketConsumer):
def chat_message(self, event):
self.send(text_data=json.dumps(event['message']))
def connect(self):
self.channel_layer.group_add(self.room_name, self.channel_name)
self.accept()
def receive(self, text_data=None, bytes_data=None):
send_message_task.apply_async(
args=(
self.room_name,
{'type': 'chat_message',
'message': 'the message'}
),
countdown=10
)
# tasks.py
@shared_task
def send_message_task(room_name, message):
layer = get_channel_layer()
layer.group_send(room_name, message)
任务正在执行中,我看不到任何错误,但消息未发送。只有当我从消费者 class 方法发送它时它才有效。
我还尝试使用 AsyncWebsocketConsumer 并使用 AsyncToSync(layer.group_send) 发送。它错误 "You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly."
然后我尝试将 send_message_task 声明为异步并使用 await。什么也没有发生(没有错误),我不确定任务是否被执行了。
以下是版本:
Django==1.11.13
redis==2.10.5
django-celery==3.2.2
channels==2.1.2
channels_redis==2.2.1
设置:
REDIS_HOST = os.getenv('REDIS_HOST', '127.0.0.1')
BROKER_URL = 'redis://{}:6379/0'.format(REDIS_HOST)
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": ['redis://{}:6379/1'.format(REDIS_HOST)],
},
},
}
有什么想法吗?
UPD: 刚刚发现 redis 通道层被检索但是它的 group_send 方法没有被调用,只是被跳过了。
更新 2: 使用 AsyncToSync(layer.group_send)
从控制台发送。没有 apply_async
的调用任务也可以。但是 运行 它与 apply_async
会导致错误 You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly
。将任务定义为异步并使用 await
当然也会破坏所有内容。
您的代码中的问题是您在类型 chat_message
中使用了 underscore
。我相信您在文档中错过了它:
The name of the method will be the type of the event with periods replaced by underscores - so, for example, an event coming in over the channel layer with a type of
chat.join
will be handled by the methodchat_join
.
所以在你的情况下,类型将是 chat.message
{
'type': 'chat.message',
'message': 'the message'
}
使用通道层时,连接时需要 async_to_sync()
包装器,因为所有通道层方法都是异步的。
def connect(self):
async_to_sync(self.channel_layer.group_add(
self.room_name, self.channel_name)
self.accept()
从您的 celery 任务发送消息也是如此。
@shared_task
def send_message_task(room_name, message):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
room_name,
{'type': 'chat_message', 'message': message}
)
您也可以像这样从您的消费者 receive()
调用您的 celery 任务:
send_message_task.delay(self.room_name, 'your message here')
关于 AsyncToSync 错误,您需要将频道和 daphne 升级到更新版本 explained in this thread。
我发现了一个丑陋且低效的决定,但它有效:
@shared_task
def send_message_task(room_name, message):
def sender(room_name, message):
channel_layer = get_channel_layer()
AsyncToSync(channel_layer.group_send)(
room_name,
{'type': 'chat_message', 'message': message}
)
thread = threading.Thread(target=sender, args=(room_name, message,))
thread.start()
thread.join()
如果有人可以改进它,我将不胜感激。
也许这不是对起始问题的直接回答,但这可能会有所帮助。 如果你得到异常 "You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly" 那么你可能会做一些这样的事情:
- 事件循环在某处创建
- 一些 ASYNC 代码已启动
- 从 ASYNC 代码调用一些 SYNC 代码
- SYNC 代码正在尝试使用 AsyncToSync 调用 ASYNC 代码,这会阻止这种情况
似乎 AsyncToSync 检测到外部事件循环并决定不干扰它。
解决方案是直接将异步调用包含在外部事件循环中。 示例代码如下,但最好检查您的情况,外循环是 运行 ...
loop = asyncio.get_event_loop()
loop.create_task(layer.group_send(room_name, {'type': 'chat_message', 'message': message}))