从 Celery 任务发送事件 - Flask-SocketIO + Celery

Sending events from Celery task - Flask-SocketIO + Celery

我正在尝试将事件从 Celery 任务发送到服务器,这样我就可以将数据发送到客户端。

  1. 用户提交了带有文本的表单
  2. 服务器接收表单提交
  3. 服务器将表单数据发送到Celery任务队列
  4. Celery 处理任务
  5. Celery 将处理后的数据发送回服务器
  6. 服务器端将数据发送给客户端,并显示在网页上

这是我目前拥有的:

routes.py

@main_bp.route('/', methods=['GET', 'POST'])
def index():
    form = UserTextForm()
    if request.method == 'POST':
        request_data = request.form.to_dict()

        user_text = request_data['user_text']

        run_processing_task.apply_async(args=[user_text])

    return render_template('index.html', form=form)

@socketio.on('my_event', namespace='/user_text_task_results')
def user_test_task_results(my_event):
    # Sends data to client
    send(my_event, broadcast=True)
celery_tasks.py

@celery.task(bind=True)
def run_processing_task(self, user_text):
    print('Celery Function kicked off')

    local_socketio = SocketIO(message_queue='redis://localhost:6379/0')

    time.sleep(3)
    user_text_reverse = user_text[::-1]
    local_socketio.emit('my_event', {'data': user_text_reverse}, namespace='/user_text_task_results')

    print('Task Completed')

    return 'Finished'
index.html

<head>
    <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
    <script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.min.js"></script>
    <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script>

    <h1>Welcome</h1>
</head>


<form method="POST" action="{{ url_for('main_bp.index') }}" class="col-6">
  {{ form.hidden_tag() }}
    <div class="form-group">
    {{ form.user_text(class="form-control", id="user-text-area") }}
  </div>
  <button type="submit" class="btn btn-primary">Submit</button>
</form>

<div id="test-app">
</div>

<script>
    $(document).ready(function() {
        var socket = io.connect('http://127.0.0.1:5000');

        socket.on('connect', function() {
            socket.send('User Connected')
        });

        socket.on('message', function(results){
            var reversed_text = results['data']
            $("$test-app").append('<p>'+reversed_text+</p>)
        });
    });
</script>

当我运行上面的时候,任务处理完成了,但是我认为服务器没有收到数据local_socketio.emit(...)。我可以看到 Celery 任务成功完成,但我没有看到任何迹象表明数据正在流向服务器并最终流向客户端。

我尝试安装 gevent 和 gevent-websocket,但是索引不会为初始 GET 请求加载事件(它只是挂在加载状态)

Celery 工作人员通常 运行 使用与 Flask 应用程序相同的代码,但它们不会 运行 用作 Flask 服务器,因此从 Celery 到 Flask 的 websockets 并不是一件容易的事。 (我从未见过它完成,但也许有人解决了棘手的部分。)

假设您希望避免让客户端或应用程序轮询任务完成,另一种方法是让工作人员通过向应用程序发出 HTTP 请求来发出完成信号。即,类似于 POST 到

/tasks/complete/<task_id>

那么这是一个应用程序问题,已完成簿记以将 task_id 与特定的 websocket 相关联。

添加:

Celery 有一个 post_run 信号应该用于相同的目的。几年前我运气不好,但现在我觉得我做了一些愚蠢的事情。