从 Celery 任务发送事件 - Flask-SocketIO + Celery
Sending events from Celery task - Flask-SocketIO + Celery
我正在尝试将事件从 Celery 任务发送到服务器,这样我就可以将数据发送到客户端。
- 用户提交了带有文本的表单
- 服务器接收表单提交
- 服务器将表单数据发送到Celery任务队列
- Celery 处理任务
- Celery 将处理后的数据发送回服务器
- 服务器端将数据发送给客户端,并显示在网页上
这是我目前拥有的:
routes.py
@main_bp.route('/', methods=['GET', 'POST'])
def index():
form = UserTextForm()
if request.method == 'POST':
request_data = request.form.to_dict()
user_text = request_data['user_text']
run_processing_task.apply_async(args=[user_text])
return render_template('index.html', form=form)
@socketio.on('my_event', namespace='/user_text_task_results')
def user_test_task_results(my_event):
# Sends data to client
send(my_event, broadcast=True)
celery_tasks.py
@celery.task(bind=True)
def run_processing_task(self, user_text):
print('Celery Function kicked off')
local_socketio = SocketIO(message_queue='redis://localhost:6379/0')
time.sleep(3)
user_text_reverse = user_text[::-1]
local_socketio.emit('my_event', {'data': user_text_reverse}, namespace='/user_text_task_results')
print('Task Completed')
return 'Finished'
index.html
<head>
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.min.js"></script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<h1>Welcome</h1>
</head>
<form method="POST" action="{{ url_for('main_bp.index') }}" class="col-6">
{{ form.hidden_tag() }}
<div class="form-group">
{{ form.user_text(class="form-control", id="user-text-area") }}
</div>
<button type="submit" class="btn btn-primary">Submit</button>
</form>
<div id="test-app">
</div>
<script>
$(document).ready(function() {
var socket = io.connect('http://127.0.0.1:5000');
socket.on('connect', function() {
socket.send('User Connected')
});
socket.on('message', function(results){
var reversed_text = results['data']
$("$test-app").append('<p>'+reversed_text+</p>)
});
});
</script>
当我运行上面的时候,任务处理完成了,但是我认为服务器没有收到数据local_socketio.emit(...)
。我可以看到 Celery 任务成功完成,但我没有看到任何迹象表明数据正在流向服务器并最终流向客户端。
我尝试安装 gevent 和 gevent-websocket,但是索引不会为初始 GET 请求加载事件(它只是挂在加载状态)
Celery 工作人员通常 运行 使用与 Flask 应用程序相同的代码,但它们不会 运行 用作 Flask 服务器,因此从 Celery 到 Flask 的 websockets 并不是一件容易的事。 (我从未见过它完成,但也许有人解决了棘手的部分。)
假设您希望避免让客户端或应用程序轮询任务完成,另一种方法是让工作人员通过向应用程序发出 HTTP 请求来发出完成信号。即,类似于 POST 到
/tasks/complete/<task_id>
那么这是一个应用程序问题,已完成簿记以将 task_id
与特定的 websocket 相关联。
添加:
Celery 有一个 post_run 信号应该用于相同的目的。几年前我运气不好,但现在我觉得我做了一些愚蠢的事情。
我正在尝试将事件从 Celery 任务发送到服务器,这样我就可以将数据发送到客户端。
- 用户提交了带有文本的表单
- 服务器接收表单提交
- 服务器将表单数据发送到Celery任务队列
- Celery 处理任务
- Celery 将处理后的数据发送回服务器
- 服务器端将数据发送给客户端,并显示在网页上
这是我目前拥有的:
routes.py
@main_bp.route('/', methods=['GET', 'POST'])
def index():
form = UserTextForm()
if request.method == 'POST':
request_data = request.form.to_dict()
user_text = request_data['user_text']
run_processing_task.apply_async(args=[user_text])
return render_template('index.html', form=form)
@socketio.on('my_event', namespace='/user_text_task_results')
def user_test_task_results(my_event):
# Sends data to client
send(my_event, broadcast=True)
celery_tasks.py
@celery.task(bind=True)
def run_processing_task(self, user_text):
print('Celery Function kicked off')
local_socketio = SocketIO(message_queue='redis://localhost:6379/0')
time.sleep(3)
user_text_reverse = user_text[::-1]
local_socketio.emit('my_event', {'data': user_text_reverse}, namespace='/user_text_task_results')
print('Task Completed')
return 'Finished'
index.html
<head>
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.min.js"></script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>
<h1>Welcome</h1>
</head>
<form method="POST" action="{{ url_for('main_bp.index') }}" class="col-6">
{{ form.hidden_tag() }}
<div class="form-group">
{{ form.user_text(class="form-control", id="user-text-area") }}
</div>
<button type="submit" class="btn btn-primary">Submit</button>
</form>
<div id="test-app">
</div>
<script>
$(document).ready(function() {
var socket = io.connect('http://127.0.0.1:5000');
socket.on('connect', function() {
socket.send('User Connected')
});
socket.on('message', function(results){
var reversed_text = results['data']
$("$test-app").append('<p>'+reversed_text+</p>)
});
});
</script>
当我运行上面的时候,任务处理完成了,但是我认为服务器没有收到数据local_socketio.emit(...)
。我可以看到 Celery 任务成功完成,但我没有看到任何迹象表明数据正在流向服务器并最终流向客户端。
我尝试安装 gevent 和 gevent-websocket,但是索引不会为初始 GET 请求加载事件(它只是挂在加载状态)
Celery 工作人员通常 运行 使用与 Flask 应用程序相同的代码,但它们不会 运行 用作 Flask 服务器,因此从 Celery 到 Flask 的 websockets 并不是一件容易的事。 (我从未见过它完成,但也许有人解决了棘手的部分。)
假设您希望避免让客户端或应用程序轮询任务完成,另一种方法是让工作人员通过向应用程序发出 HTTP 请求来发出完成信号。即,类似于 POST 到
/tasks/complete/<task_id>
那么这是一个应用程序问题,已完成簿记以将 task_id
与特定的 websocket 相关联。
添加:
Celery 有一个 post_run 信号应该用于相同的目的。几年前我运气不好,但现在我觉得我做了一些愚蠢的事情。