Flask-SocketIO - 如何从子进程发出事件
Flask-SocketIO - How to emit an event from a sub-process
我有一个 Flask 应用程序,它在某些休息调用时是 运行 几个使用 ProcessPoolExecutor 的模块。
更新: 添加了 redis 作为消息队列(使用 docker,redis 作为 redis 的主机)
socketio = SocketIO(app, message_queue='redis://redis')
(...)
def emit_event(evt, message):
socketio.emit(evt, message, namespace='/test')
@app.route('/info', methods=['GET'])
def info():
emit_event('update_reports', '')
(...)
if __name__ == "__main__":
socketio.run(host='0.0.0.0', threaded=True)
现在我添加了 redis,它在从主应用程序发出时仍然有效。
这里有一些代码我是 运行 子过程:
def __init__(self):
self.executor = futures.ProcessPoolExecutor(max_workers=4)
self.socketio = SocketIO(async_mode='eventlet', message_queue='redis://redis')
(...)
future = self.executor.submit(process, params)
future.add_done_callback(functools.partial(self.finished_callback, pid))
然后在回调中调用 emit_event
方法:
def finished_callback(self, pid, future):
pid.status = Status.DONE.value
pid.finished_at = datetime.datetime.utcnow
pid.save()
self.socketio.emit('update_reports', 'done', namespace='/test')
获取和发送/发出消息 from/to 来自我的控制器的客户端工作得很好,如果我从 curl 或邮递员调用 /info,我的客户端也会收到消息 - 但是 - 当尝试发出相同的事件时从这个子进程回调中开始,现在它显示这个错误:
这主要是为了通知,比如在一个漫长的过程结束时发出通知之类的。
INFO:socketio:emitting event "update_reports" to all [/test]
ERROR:socketio:Cannot publish to redis... retrying
ERROR:socketio:Cannot publish to redis... giving up
我做错了什么?
谢谢!
在设置 Flask-SocketIO 扩展以便外部进程可以发出时需要遵循一些特定规则,其中包括使用主进程和外部进程用来协调工作的消息队列。有关说明,请参阅文档的 Emitting from an External Process 部分。
我有一个 Flask 应用程序,它在某些休息调用时是 运行 几个使用 ProcessPoolExecutor 的模块。
更新: 添加了 redis 作为消息队列(使用 docker,redis 作为 redis 的主机)
socketio = SocketIO(app, message_queue='redis://redis')
(...)
def emit_event(evt, message):
socketio.emit(evt, message, namespace='/test')
@app.route('/info', methods=['GET'])
def info():
emit_event('update_reports', '')
(...)
if __name__ == "__main__":
socketio.run(host='0.0.0.0', threaded=True)
现在我添加了 redis,它在从主应用程序发出时仍然有效。 这里有一些代码我是 运行 子过程:
def __init__(self):
self.executor = futures.ProcessPoolExecutor(max_workers=4)
self.socketio = SocketIO(async_mode='eventlet', message_queue='redis://redis')
(...)
future = self.executor.submit(process, params)
future.add_done_callback(functools.partial(self.finished_callback, pid))
然后在回调中调用 emit_event
方法:
def finished_callback(self, pid, future):
pid.status = Status.DONE.value
pid.finished_at = datetime.datetime.utcnow
pid.save()
self.socketio.emit('update_reports', 'done', namespace='/test')
获取和发送/发出消息 from/to 来自我的控制器的客户端工作得很好,如果我从 curl 或邮递员调用 /info,我的客户端也会收到消息 - 但是 - 当尝试发出相同的事件时从这个子进程回调中开始,现在它显示这个错误:
这主要是为了通知,比如在一个漫长的过程结束时发出通知之类的。
INFO:socketio:emitting event "update_reports" to all [/test]
ERROR:socketio:Cannot publish to redis... retrying
ERROR:socketio:Cannot publish to redis... giving up
我做错了什么?
谢谢!
在设置 Flask-SocketIO 扩展以便外部进程可以发出时需要遵循一些特定规则,其中包括使用主进程和外部进程用来协调工作的消息队列。有关说明,请参阅文档的 Emitting from an External Process 部分。