在另一个消费者的一个 Django Channels 消费者中打破循环

Break loop in one Django Channels consumer from another consumer

我正在使用 Django 和使用 websockets 的 Django Channels 构建网络应用程序。

当用户单击浏览器中的按钮时,websocket 将数据发送到我的服务器,服务器上的消费者开始每秒向客户端发送一次消息(循环)。

我想创建另一个按钮来停止此数据发送过程。当用户点击这个新按钮时,websocket 将另一个数据发送到服务器,服务器上的消费者必须以某种方式停止先前消费者的循环。此外,当客户端断开连接时,我将要求它停止循环。

我很想使用全局变量。但 Django Channels 文档指出,他们强烈不建议使用全局变量,因为他们希望保持应用程序网络透明(不太了解这一点)。

我试过使用频道会话。我让第二个消费者更新通道会话中的值,但通道会话值没有在第一个消费者中更新。

这里是简化的代码。 浏览器:

var socket = new WebSocket("ws://" + window.location.host + "/socket/");
$('#button1').on('click', function() { 
    socket.send(JSON.stringify({action: 'start_getting_values'}))
});
$('#button2').on('click', function() { 
    socket.send(JSON.stringify({action: 'stop_getting_values'}))
});

服务器上的消费者:

@channel_session
def ws_message(message):
    text = json.loads(message.content['text'])

    if text['action'] == 'start_getting_values':
        while True:
            # Getting some data here
            # ...
            message.reply_channel.send({"text": some_data}, immediately=True)
            time.sleep(1)

    if text['action'] == 'stop_getting_values':
        do_something_to_stop_the_loop_above()

好吧,在联系了 Django Channels 开发人员后,我设法自己解决了这个任务。

在消费者内部创建循环的方法很糟糕,因为一旦消费者 运行 的次数等于所有工作线程的数量 运行 宁这个消费者,它就会阻塞站点。

所以我的方法如下:一旦消费者收到 'start_getting_values' 信号,它将当前回复通道添加到一个组,并在它连接的 Redis 服务器上增加值(我使用 Redis 作为通道层后端,但是它可以在任何其他后端上运行)。

它增加什么值?在 Redis 上,我有一个哈希对象类型的键说 'groups'。该键的每个键代表 Channels 中的一个组,值代表该组中回复通道的数量。

然后我创建了一个新的 python 文件,我在其中连接到同一个 Redis 服务器。在这个文件中,我 运行 无限循环从 Redis 的键 'groups' 加载字典。然后我遍历这个字典中的每个键(每个键代表频道组名称)并将数据广播到每个具有非零值的组。当我 运行 这个文件时,它是 运行 作为单独的进程,因此不会阻止消费者端的任何内容。

要停止向用户广播,当我从他那里得到适当的信号时,我只是将他从组中删除并减少适当的 Redis 值。

消费者代码:

import redis

redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

@channel_session_user
def ws_message(message):

    text = json.loads(message.content['text'])

    if text['stream'] == 'start_getting_values':
        value_id = text['value_id']
        redis_client.hincrby('redis_some_key', value_id, 1)
        Group(value_id).add(message.reply_channel)
        channel_session['value_id'] = value_id
        return 0

    if text['stream'] == 'stop_getting_values':
        if message.channel_session['value_id'] != '':
            value_id = message.channel_session['value_id']
            Group(value_id).discard(message.reply_channel)

            l = redis_client.lock(name='del_lock')
            val = redis_client.hincrby('redis_some_key', value_id, -1)
            if (val <= 0):
                redis_client.hdel('redis_some_key', value_id)
            l.release()
        return 0

单独的 python 文件:

import time
import redis
from threading import Thread
import asgi_redis


redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
cl = asgi_redis.RedisChannelLayer()

def some_action(value_id):

    # getting some data based on value_id
    # ....

    cl.send_group(value_id, {
        "text": some_data,
    })


while True:
    value_ids = redis_client.hgetall('redis_some_key')

    ths = []
    for b_value_id in value_ids.keys():
        value_id = b_value_id.decode("utf-8")
        ths.append(Thread(target=some_action, args=(value_id,)))

    for th in ths:
        th.start()
    for th in ths:
        th.join()


    time.sleep(1)