在另一个消费者的一个 Django Channels 消费者中打破循环
Break loop in one Django Channels consumer from another consumer
我正在使用 Django 和使用 websockets 的 Django Channels 构建网络应用程序。
当用户单击浏览器中的按钮时,websocket 将数据发送到我的服务器,服务器上的消费者开始每秒向客户端发送一次消息(循环)。
我想创建另一个按钮来停止此数据发送过程。当用户点击这个新按钮时,websocket 将另一个数据发送到服务器,服务器上的消费者必须以某种方式停止先前消费者的循环。此外,当客户端断开连接时,我将要求它停止循环。
我很想使用全局变量。但 Django Channels 文档指出,他们强烈不建议使用全局变量,因为他们希望保持应用程序网络透明(不太了解这一点)。
我试过使用频道会话。我让第二个消费者更新通道会话中的值,但通道会话值没有在第一个消费者中更新。
这里是简化的代码。
浏览器:
var socket = new WebSocket("ws://" + window.location.host + "/socket/");
$('#button1').on('click', function() {
socket.send(JSON.stringify({action: 'start_getting_values'}))
});
$('#button2').on('click', function() {
socket.send(JSON.stringify({action: 'stop_getting_values'}))
});
服务器上的消费者:
@channel_session
def ws_message(message):
text = json.loads(message.content['text'])
if text['action'] == 'start_getting_values':
while True:
# Getting some data here
# ...
message.reply_channel.send({"text": some_data}, immediately=True)
time.sleep(1)
if text['action'] == 'stop_getting_values':
do_something_to_stop_the_loop_above()
好吧,在联系了 Django Channels 开发人员后,我设法自己解决了这个任务。
在消费者内部创建循环的方法很糟糕,因为一旦消费者 运行 的次数等于所有工作线程的数量 运行 宁这个消费者,它就会阻塞站点。
所以我的方法如下:一旦消费者收到 'start_getting_values' 信号,它将当前回复通道添加到一个组,并在它连接的 Redis 服务器上增加值(我使用 Redis 作为通道层后端,但是它可以在任何其他后端上运行)。
它增加什么值?在 Redis 上,我有一个哈希对象类型的键说 'groups'。该键的每个键代表 Channels 中的一个组,值代表该组中回复通道的数量。
然后我创建了一个新的 python 文件,我在其中连接到同一个 Redis 服务器。在这个文件中,我 运行 无限循环从 Redis 的键 'groups' 加载字典。然后我遍历这个字典中的每个键(每个键代表频道组名称)并将数据广播到每个具有非零值的组。当我 运行 这个文件时,它是 运行 作为单独的进程,因此不会阻止消费者端的任何内容。
要停止向用户广播,当我从他那里得到适当的信号时,我只是将他从组中删除并减少适当的 Redis 值。
消费者代码:
import redis
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
@channel_session_user
def ws_message(message):
text = json.loads(message.content['text'])
if text['stream'] == 'start_getting_values':
value_id = text['value_id']
redis_client.hincrby('redis_some_key', value_id, 1)
Group(value_id).add(message.reply_channel)
channel_session['value_id'] = value_id
return 0
if text['stream'] == 'stop_getting_values':
if message.channel_session['value_id'] != '':
value_id = message.channel_session['value_id']
Group(value_id).discard(message.reply_channel)
l = redis_client.lock(name='del_lock')
val = redis_client.hincrby('redis_some_key', value_id, -1)
if (val <= 0):
redis_client.hdel('redis_some_key', value_id)
l.release()
return 0
单独的 python 文件:
import time
import redis
from threading import Thread
import asgi_redis
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
cl = asgi_redis.RedisChannelLayer()
def some_action(value_id):
# getting some data based on value_id
# ....
cl.send_group(value_id, {
"text": some_data,
})
while True:
value_ids = redis_client.hgetall('redis_some_key')
ths = []
for b_value_id in value_ids.keys():
value_id = b_value_id.decode("utf-8")
ths.append(Thread(target=some_action, args=(value_id,)))
for th in ths:
th.start()
for th in ths:
th.join()
time.sleep(1)
我正在使用 Django 和使用 websockets 的 Django Channels 构建网络应用程序。
当用户单击浏览器中的按钮时,websocket 将数据发送到我的服务器,服务器上的消费者开始每秒向客户端发送一次消息(循环)。
我想创建另一个按钮来停止此数据发送过程。当用户点击这个新按钮时,websocket 将另一个数据发送到服务器,服务器上的消费者必须以某种方式停止先前消费者的循环。此外,当客户端断开连接时,我将要求它停止循环。
我很想使用全局变量。但 Django Channels 文档指出,他们强烈不建议使用全局变量,因为他们希望保持应用程序网络透明(不太了解这一点)。
我试过使用频道会话。我让第二个消费者更新通道会话中的值,但通道会话值没有在第一个消费者中更新。
这里是简化的代码。 浏览器:
var socket = new WebSocket("ws://" + window.location.host + "/socket/");
$('#button1').on('click', function() {
socket.send(JSON.stringify({action: 'start_getting_values'}))
});
$('#button2').on('click', function() {
socket.send(JSON.stringify({action: 'stop_getting_values'}))
});
服务器上的消费者:
@channel_session
def ws_message(message):
text = json.loads(message.content['text'])
if text['action'] == 'start_getting_values':
while True:
# Getting some data here
# ...
message.reply_channel.send({"text": some_data}, immediately=True)
time.sleep(1)
if text['action'] == 'stop_getting_values':
do_something_to_stop_the_loop_above()
好吧,在联系了 Django Channels 开发人员后,我设法自己解决了这个任务。
在消费者内部创建循环的方法很糟糕,因为一旦消费者 运行 的次数等于所有工作线程的数量 运行 宁这个消费者,它就会阻塞站点。
所以我的方法如下:一旦消费者收到 'start_getting_values' 信号,它将当前回复通道添加到一个组,并在它连接的 Redis 服务器上增加值(我使用 Redis 作为通道层后端,但是它可以在任何其他后端上运行)。
它增加什么值?在 Redis 上,我有一个哈希对象类型的键说 'groups'。该键的每个键代表 Channels 中的一个组,值代表该组中回复通道的数量。
然后我创建了一个新的 python 文件,我在其中连接到同一个 Redis 服务器。在这个文件中,我 运行 无限循环从 Redis 的键 'groups' 加载字典。然后我遍历这个字典中的每个键(每个键代表频道组名称)并将数据广播到每个具有非零值的组。当我 运行 这个文件时,它是 运行 作为单独的进程,因此不会阻止消费者端的任何内容。
要停止向用户广播,当我从他那里得到适当的信号时,我只是将他从组中删除并减少适当的 Redis 值。
消费者代码:
import redis
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
@channel_session_user
def ws_message(message):
text = json.loads(message.content['text'])
if text['stream'] == 'start_getting_values':
value_id = text['value_id']
redis_client.hincrby('redis_some_key', value_id, 1)
Group(value_id).add(message.reply_channel)
channel_session['value_id'] = value_id
return 0
if text['stream'] == 'stop_getting_values':
if message.channel_session['value_id'] != '':
value_id = message.channel_session['value_id']
Group(value_id).discard(message.reply_channel)
l = redis_client.lock(name='del_lock')
val = redis_client.hincrby('redis_some_key', value_id, -1)
if (val <= 0):
redis_client.hdel('redis_some_key', value_id)
l.release()
return 0
单独的 python 文件:
import time
import redis
from threading import Thread
import asgi_redis
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
cl = asgi_redis.RedisChannelLayer()
def some_action(value_id):
# getting some data based on value_id
# ....
cl.send_group(value_id, {
"text": some_data,
})
while True:
value_ids = redis_client.hgetall('redis_some_key')
ths = []
for b_value_id in value_ids.keys():
value_id = b_value_id.decode("utf-8")
ths.append(Thread(target=some_action, args=(value_id,)))
for th in ths:
th.start()
for th in ths:
th.join()
time.sleep(1)