Flask socketio 多线程转储字典排队并发送给客户端
Flask socketio multithread dump dictionary to queue and send to clients
目标只是能够创建字典的线程队列并将它们报告给客户端。
编辑
这与 Flask throwing 'working outside of request context' when starting sub thread 不同,因为:
不是在路由函数中完成的,是在socketio.start_background_task
中完成的
唯一的 socketio 代码发生在上下文中,socketio.emit 我们正在发送字典。
策略:
在服务器端有 2 个不同的任务要执行,每个任务构建一个线程,然后在另一个 socketio 线程中收集结果,这些结果在字典的线程安全队列 FIFO 中。
然后将这些字典发送给客户端并等待每个确认。
所以现在问题减少到解决:
RuntimeError: Working outside of request context.
from flask import Flask, flash, request, redirect, render_template, Response, escape, jsonify, url_for, session, copy_current_request_context
#socketio
from flask_socketio import SocketIO, send, emit, join_room, leave_room, close_room, rooms, disconnect
import threading
from threading import Thread, Event, Lock
import queue
import random
def ack(value):
if value != 'pong':
logger.info('unexpected return value')
def fn_i():
global q
while True:
time.sleep(1)
q.put({'key_i':random.random()})
return q
def fn_ii():
global q
while True:
time.sleep(10)
q.put({'key_ii':random.random()})
return q
app = Flask(__name__)
socketio = SocketIO(app, async_mode=async_mode)
thread1=None
thread2=None
collector_thread=None
q = queue.Queue()
thread_lock = Lock()
def background_thread_collector():
global thread1
global thread2
global q
thread1 = threading.Thread(target=fn_i)
thread1.start()
thread2 = threading.Thread(target=fn_ii)
thread2.start()
"""Example of how to send server generated events to clients."""
while True:
time.sleep(0.2)
while not q.empty():
socketio.emit('my_response',
q.get(), #{'data': 'Server generated event', 'count': count},
namespace='/test',
broadcast=True,
callback=ack
)
@app.route('/')
def index():
return render_template('index.html', async_mode=socketio.async_mode)
@socketio.on('connect', namespace='/test')
def test_connect():
global collector_thread
logger.info(' Client connected ' + request.sid)
with thread_lock:
if collector_thread is None:
collector_thread = socketio.start_background_task(background_thread_collector)
emit('my_response', {'data': 'Connected', 'count': 0})
if __name__ == '__main__':
socketio.run(app,
host='localhost',
port=10000,
debug=False) #True sends some exceptions and stops)
干杯
这应该由 Flask-SocketIO 更好地处理,但问题是您正试图在设置为广播给所有客户端的发射上使用回调:
socketio.emit('my_response',
q.get(), #{'data': 'Server generated event', 'count': count},
namespace='/test',
broadcast=True,
callback=ack
)
删除回调,发射应该会正常工作。
在 Flasksocketio 中使用线程队列并不简单,因为需要处理应用程序上下文,为了在客户端刷新服务器日志文件,发现在这种情况下简单地使用 javascript 更容易并且文件可以相应地更新。由于应用程序上下文的原因,即使是带有建议更改的前代码也无法正常工作,但是 none 中的多个博客或 Whosebug 方法主题已经找到了一个完整的工作解决方案,除了像解释一样实施,因为任何其他答案都需要完整的代码,这是工作,因此考虑到这是公认的答案,因此能够在这里实施,干杯。
目标只是能够创建字典的线程队列并将它们报告给客户端。
编辑 这与 Flask throwing 'working outside of request context' when starting sub thread 不同,因为:
不是在路由函数中完成的,是在socketio.start_background_task
中完成的唯一的 socketio 代码发生在上下文中,socketio.emit 我们正在发送字典。
策略: 在服务器端有 2 个不同的任务要执行,每个任务构建一个线程,然后在另一个 socketio 线程中收集结果,这些结果在字典的线程安全队列 FIFO 中。
然后将这些字典发送给客户端并等待每个确认。
所以现在问题减少到解决:
RuntimeError: Working outside of request context.
from flask import Flask, flash, request, redirect, render_template, Response, escape, jsonify, url_for, session, copy_current_request_context
#socketio
from flask_socketio import SocketIO, send, emit, join_room, leave_room, close_room, rooms, disconnect
import threading
from threading import Thread, Event, Lock
import queue
import random
def ack(value):
if value != 'pong':
logger.info('unexpected return value')
def fn_i():
global q
while True:
time.sleep(1)
q.put({'key_i':random.random()})
return q
def fn_ii():
global q
while True:
time.sleep(10)
q.put({'key_ii':random.random()})
return q
app = Flask(__name__)
socketio = SocketIO(app, async_mode=async_mode)
thread1=None
thread2=None
collector_thread=None
q = queue.Queue()
thread_lock = Lock()
def background_thread_collector():
global thread1
global thread2
global q
thread1 = threading.Thread(target=fn_i)
thread1.start()
thread2 = threading.Thread(target=fn_ii)
thread2.start()
"""Example of how to send server generated events to clients."""
while True:
time.sleep(0.2)
while not q.empty():
socketio.emit('my_response',
q.get(), #{'data': 'Server generated event', 'count': count},
namespace='/test',
broadcast=True,
callback=ack
)
@app.route('/')
def index():
return render_template('index.html', async_mode=socketio.async_mode)
@socketio.on('connect', namespace='/test')
def test_connect():
global collector_thread
logger.info(' Client connected ' + request.sid)
with thread_lock:
if collector_thread is None:
collector_thread = socketio.start_background_task(background_thread_collector)
emit('my_response', {'data': 'Connected', 'count': 0})
if __name__ == '__main__':
socketio.run(app,
host='localhost',
port=10000,
debug=False) #True sends some exceptions and stops)
干杯
这应该由 Flask-SocketIO 更好地处理,但问题是您正试图在设置为广播给所有客户端的发射上使用回调:
socketio.emit('my_response',
q.get(), #{'data': 'Server generated event', 'count': count},
namespace='/test',
broadcast=True,
callback=ack
)
删除回调,发射应该会正常工作。
在 Flasksocketio 中使用线程队列并不简单,因为需要处理应用程序上下文,为了在客户端刷新服务器日志文件,发现在这种情况下简单地使用 javascript 更容易并且文件可以相应地更新。由于应用程序上下文的原因,即使是带有建议更改的前代码也无法正常工作,但是 none 中的多个博客或 Whosebug 方法主题已经找到了一个完整的工作解决方案,除了像解释一样实施,因为任何其他答案都需要完整的代码,这是工作,因此考虑到这是公认的答案,因此能够在这里实施,干杯。