将消息从 Celery 任务发送到 Channels

Sending message from Celery task to Channels

Django 2.1.1, Django 频道 2.1.3, 芹菜 4.2.1

我在 Celery 中设置了一个任务,在任务结束时,我需要向客户端发送一个 websocket 消息。但是,永远不会发送 websocket 消息。没有抛出任何错误,只是没有发送。

我已经使用 Redis 作为后端设置了一个通道层。从普通的 Django 视图执行此操作效果很好。但是当 Celery 任务中 运行 时,它将消息发送到 Channels,我可以看到 Channels 确实 运行 下面我的 consumers.py 代码中显示的代码,但客户端从未收到网络套接字消息。

tasks.py

def import_job(self):
    # (do task calculations, store in data dict)
    message = {'type': 'send_my_data',
               'data': json.dumps(thecalcs) }
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)('core-data', message)

consumers.py

class AsyncDataConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.channel_group_name = 'core-data'

        # Join the group
        await self.channel_layer.group_add(
            self.channel_group_name,
            self.channel_name
        )
        await self.accept()

    async def disconnect(self, close_code):
        # Leave the group
        await self.channel_layer.group_discard(
            self.channel_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    async def receive(self, text_data=None, bytes_data=None):
        pass

    # Receive message from the group
    async def send_my_data(self, event):
        text = event['data']
        # Send message to WebSocket
        await self.send(text_data=text)

settings.py

CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}

由于没有exception/error,我完全不知道这个过程的哪一部分失败了。

  1. Celery触发任务?是
  2. 任务运行发送消息到通道层?是
  3. 消费者收到群发来的消息,执行send()?是
  4. 客户端收到websocket消息?没有

这是Channels和Redis之间的问题吗?是 Channels 和客户端之间的问题吗?

事实证明,在执行任务期间,Celery 在我的代码中吞下了一个异常。我需要实施更彻底的日志记录来捕获这些异常。