Flask-Socketio 不从外部 RQ 进程发出
Flask-Socketio not emitting from external RQ process
我是 运行 一个使用 Flask-Socketio 连接到 iOS 客户端的烧瓶服务器。服务器必须处理一些复杂的数据,由于这需要一段时间才能解决,所以我使用 Redis 队列在后台作业中完成。
通信正常工作,但我需要发送给客户端,并在作业完成后写入数据库,我正在尝试从作业函数中执行此操作(如果有办法让应用程序知道工作完成后,该应用程序可以在一个地方处理所有通信)。
为此,我在作业中启动了一个新的 Socketio 实例,并将其连接到 redis 队列,但我认为我做错了。
它没有崩溃,但客户端没有收到任何东西。
这是我的代码:
tasks.py
# This is the job
def engine(path, id):
result = process(path)
print(result)
socket = SocketIO(message_queue = os.environ.get('REDIS_URL'))
socket.emit('info', result)
events.py
def launch_task(name, description, *args, **kwargs):
rq_job = current_app.task_queue.enqueue('app.tasks.' + name,
*args, **kwargs)
return rq_job.get_id()
@socketio.on('File')
def got_file(file):
print("GOT FILE")
print(file[0])
name = file[0] + ".csv"
path = queue_dir + name
data = file[1]
csv = open(path, "w")
csv.write(data)
csv.close()
print(path)
launch_task("engine", "test", path, request.sid)
__init__.py
socketio = SocketIO()
def create_app(debug=False, config_class=Config):
app = Flask(__name__)
app.debug = debug
app.config.from_object(config_class)
app.redis = Redis.from_url(app.config['REDIS_URL'])
app.task_queue = rq.Queue('alg-tasks', connection=app.redis)
from .main import main as main_blueprint
app.register_blueprint(main_blueprint)
socketio.init_app(app)
return app
events.py 处理所有通信并启动 worker。
我认为我在实例化Socketio时的论点是错误的,但我不知道...关于Socketio和后台作业我还有很多不明白的地方。
提前致谢!
在应用程序上,您必须使用 app
和消息队列初始化 SocketIO
对象:
socketio.init_app(app, message_queue=os.environ.get('REDIS_URL'))
在你的 RQ worker 上你做得对,只使用了消息队列:
socket = SocketIO(message_queue=os.environ.get('REDIS_URL'))
但是每次发出时都创建一个新的 SocketIO
实例是一种资源浪费,您应该创建一个全局实例,可以在 worker 处理的多个任务中重复使用。
我是 运行 一个使用 Flask-Socketio 连接到 iOS 客户端的烧瓶服务器。服务器必须处理一些复杂的数据,由于这需要一段时间才能解决,所以我使用 Redis 队列在后台作业中完成。
通信正常工作,但我需要发送给客户端,并在作业完成后写入数据库,我正在尝试从作业函数中执行此操作(如果有办法让应用程序知道工作完成后,该应用程序可以在一个地方处理所有通信)。
为此,我在作业中启动了一个新的 Socketio 实例,并将其连接到 redis 队列,但我认为我做错了。
它没有崩溃,但客户端没有收到任何东西。
这是我的代码:
tasks.py
# This is the job
def engine(path, id):
result = process(path)
print(result)
socket = SocketIO(message_queue = os.environ.get('REDIS_URL'))
socket.emit('info', result)
events.py
def launch_task(name, description, *args, **kwargs):
rq_job = current_app.task_queue.enqueue('app.tasks.' + name,
*args, **kwargs)
return rq_job.get_id()
@socketio.on('File')
def got_file(file):
print("GOT FILE")
print(file[0])
name = file[0] + ".csv"
path = queue_dir + name
data = file[1]
csv = open(path, "w")
csv.write(data)
csv.close()
print(path)
launch_task("engine", "test", path, request.sid)
__init__.py
socketio = SocketIO()
def create_app(debug=False, config_class=Config):
app = Flask(__name__)
app.debug = debug
app.config.from_object(config_class)
app.redis = Redis.from_url(app.config['REDIS_URL'])
app.task_queue = rq.Queue('alg-tasks', connection=app.redis)
from .main import main as main_blueprint
app.register_blueprint(main_blueprint)
socketio.init_app(app)
return app
events.py 处理所有通信并启动 worker。
我认为我在实例化Socketio时的论点是错误的,但我不知道...关于Socketio和后台作业我还有很多不明白的地方。
提前致谢!
在应用程序上,您必须使用 app
和消息队列初始化 SocketIO
对象:
socketio.init_app(app, message_queue=os.environ.get('REDIS_URL'))
在你的 RQ worker 上你做得对,只使用了消息队列:
socket = SocketIO(message_queue=os.environ.get('REDIS_URL'))
但是每次发出时都创建一个新的 SocketIO
实例是一种资源浪费,您应该创建一个全局实例,可以在 worker 处理的多个任务中重复使用。