websocket在漫长过程中的异步反馈

websocket asynchronous feedback during a long process

我正在尝试在网页中实现反馈,让用户从 Excel sheet(视线,是的......)开始一个漫长的过程。对于每行数据,处理时间约为1秒,常见的数据长度在40-100条之间,所以整体处理时间可以大于一分钟。

我正在页面中显示数据预览,通过 websocket 启动该过程,并希望显示同一 websocket 的进度。

本身的处理是由外部包完成的,页面复杂度很小,所以我把它包装在一个 Lite 单个文件中。

我的问题是在 websocket 路由中开始的长时间处理阻塞了反馈,直到它完成并且所有进程事件在最后同时发送。据我了解,它与Mojolicious的事件循环有关,我应该单独开始处理以避免冻结websocket的处理。

请注意,我已经尝试使用 EventSource 单独的反馈渠道在处理过程中将一些进展推送给客户端,但它在最后一次显示相同的完成。

这里是我的代码简化,我正在使用一个sleep()来模拟长过程。我从

开始
perl mojo_notify_ws.pl daemon

您能否建议如何修改 websocket 路由以允许实时反馈?

use Mojolicious::Lite;
use Mojo::JSON qw(encode_json decode_json j);

use Data::Dumper;

$|++;

any '/' => sub {
    my $c = shift;
    $c->render('index');
};

my $peer;
websocket '/go' => sub {
    use Carp::Always;
    my $ws = shift;

    $peer = $ws->tx;
    app->log->debug(sprintf 'Client connected: %s', Dumper $peer->remote_address);

    # do not subscribe to 'text' else 'json' won't work
    #$ws->on(text => sub {
    #    my ($ws, $msg) = @_;
    #    app->log->debug("Received text from websocket: `$msg`");
    #        });

    # $peer->send('{"type": "test"}');
    # say 'default inactivity timeout='. (p $ws->inactivity_timeout());
    $ws->inactivity_timeout(120);

    $ws->on(json => sub {
        my ($ws, $msg) = @_;
        app->log->debug('Received from websocket:', Dumper($msg));
        unless($msg){
            app->log->debug('Received empty message? WTF?!');
            return;
        }
        my $prompt = $msg->{cmd};
        return unless $prompt;
        app->log->debug(sprintf 'Received: `%s`', $prompt // 'empty??');

        # simulate
        my $loop = Mojo::IOLoop->singleton;

#        $loop->subprocess( sub {
#            my $sp = shift;

        for my $cell (1..3) {
            # $loop->delay( sub {
                app->log->debug("sending cell $cell");
                my $payload = {
                        type => 'ticket',
                        cell => $cell,
                        result => $cell % 2 ? 'OK' : 'NOK'
                };
                $ws->send( { json => $payload } );
                sleep(2);
                # $loop->timer(2, sub {say 'we have waited 2 secs!';})->wait;
            # });
        };

#        }, sub {} );#subprocess

        app->log->debug('sending end of process ->websocket');
        $ws->send({json => { type => 'end' } });
    });

    $ws->on(finish => sub {
        my ($ws, $code, $reason) = @_;
        $reason = '' unless defined $reason;
        app->log->debug("Client disconnected: $code ($reason)");
    });

    app->log->debug('Reached end of ws route definition');
};

app->start;

__DATA__

@@ index.html.ep
<html>
    <head>
    <script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.js"></script>
    <script>
var timerID = 0; 
function keepAlive(ws) { 
    var timeout = 20000;  
    if (ws.readyState == ws.OPEN) {  
        ws.send('ping');  
    }  
    timerId = setTimeout(function(){keepAlive(ws);}, timeout);  
}  
function cancelKeepAlive() {  
    if (timerId) {  
        clearTimeout(timerId);  
    }  
}

function flagCell(cell, result){
    var id='#CELL_' + cell;
    var cell = $(id);
    if(cell) {
        if (result=='OK') {
            cell.css('color', 'green');
            cell.text('⯲');
        } else {
            cell.css('color','red');
            cell.text('✘');
        }
    }
}

function process(){
    //debugger;
    console.log('Opening WebSocket');
    var ws = new WebSocket('<%= url_for('go')->to_abs %>');

    ws.onopen = function (){
        console.log('Websocket Open');
        //keepAlive(ws);
        ws.send(JSON.stringify({cmd: "let's go Perl"}));
    };
    //incoming
    ws.onmessage = function(evt){
        var data = JSON.parse(evt.data);
        console.log('WS received '+JSON.stringify(data));
        if (data.type == 'ticket') {
            console.log('Server has send a status');
            console.log('Cell:'+data.cell + ' res:' + data.result);

            flagCell(data.cell, data.result);
        } else if (data.type == 'end') {
            console.log('Server has finished.');
            //cancelKeepAlive();
            ws.close();
        } else {
            console.log('Unknown message:' + evt.data);
        }
    };
    ws.onerror = function (evt) {
        console.log('ws error:', evt.data);
    }
    ws.onclose = function (evt) {
        if(evt.wasClean) {
            console.log('Connection closed cleanly');
        } else {
            console.log('Connection reseted');
        }
        console.log('Code:'+ evt.code + ' Reason:' + evt.reason);
    }
}

    </script>
    </head>
    <body>
        <button type=button id='upload' onclick="process();">Go</button><br>
        <div style='font-family:sans;'>
            <table border="1px">
              <tr><td id="CELL_1">&nbsp;</td><td>Foo</td></tr>
              <tr><td id="CELL_2">&nbsp;</td><td>Bar</td></tr>
              <tr><td id="CELL_3">&nbsp;</td><td>Baz</td></tr>
            </table>
        </div>
    </body>
</html>

编辑:

Grinnz 提供了一个合适的解决方案,但作为记录,这是我尝试使用 Mojo::IOLoop::Subprocess 回调,但我没有任何反馈。我在 Linux 上 运行ning 并且 Subprocess 似乎分叉, 并且父进程似乎立即终止了 websocket 编辑: 否:我最终发现 $ws->send() 在错误的位置,因为它应该放在第二个 sub{},即父端的 运行,并且不在子进程中的第一个 运行 中。此代码应重构为每个循环迭代有一个 subprocess 和结束通知的最后一步。

这里是修改后的on(json)

$ws->on(json => sub {
    my ($ws, $msg) = @_;
    app->log->debug('Received from websocket:', Dumper($msg));
    unless($msg){
        app->log->debug('Received empty message? WTF?!');
        return;
    }
    my $prompt = $msg->{cmd};
    return unless $prompt;
    app->log->debug(sprintf 'Received: `%s`', $prompt // '<empty??>');

    # my $loop = Mojo::IOLoop->singleton;
    my $subprocess = Mojo::IOLoop::Subprocess->new;
    app->log->debug("we are pid $$");
    $subprocess->run( 
        sub {
            my $sp = shift;
            for my $cell (1..3) {
                app->log->debug("starting process for cell $cell in pid $$");     
                sleep(2);
                app->log->debug("sending cell $cell to ws");
                my $payload = {
                    type => 'ticket',
                    cell => $cell,
                    result => $cell % 2 ? 'OK' : 'NOK'
                };
                $ws->send( { json => $payload } ); # FIXME: actually this line is in the wrong place
                                                   # and should be in the second sub{}
            };
        },
        sub {
            my ($sp, $err, @results) = @_; 
            $ws->reply->exception($err) and return if $err;
            app->log->debug('sending end of process ->websocket');
            $ws->send({json => { type => 'end' } });
        });  
    # Start event loop if necessary
    $subprocess->ioloop->start unless $subprocess->ioloop->is_running;       
});

以及对应的日志:

[Wed Oct  3 19:51:58 2018] [debug] Received: `let's go Perl`
[Wed Oct  3 19:51:58 2018] [debug] we are pid 8898
[Wed Oct  3 19:51:58 2018] [debug] Client disconnected: 1006 ()
[Wed Oct  3 19:51:58 2018] [debug] starting process for cell 1 in pid 8915
[Wed Oct  3 19:52:00 2018] [debug] sending cell 1 to ws
[Wed Oct  3 19:52:00 2018] [debug] starting process for cell 2 in pid 8915
[Wed Oct  3 19:52:02 2018] [debug] sending cell 2 to ws
[Wed Oct  3 19:52:02 2018] [debug] starting process for cell 3 in pid 8915
[Wed Oct  3 19:52:04 2018] [debug] sending cell 3 to ws
[Wed Oct  3 19:52:04 2018] [debug] sending end of process ->websocket
[Wed Oct  3 19:52:04 2018] [debug] Client disconnected: 1005 ()

我还尝试使用 Mojo::IOLoop->delay 以类似于 Promise 解决方案的方式生成一系列复杂的步骤,但这个解决方案最后也会同时发送所有通知:

$ws->on(json => sub {
    my ($ws, $msg) = @_;
    app->log->debug('Received from websocket:', Dumper($msg));
    unless($msg){
        app->log->debug('Received empty message? WTF?!');
        return;
    }
    my $prompt = $msg->{cmd};
    return unless $prompt;
    app->log->debug(sprintf 'Received: `%s`', $prompt // '<empty??>');

    app->log->debug("we are pid $$");

    my @steps;
    for my $cell (1..3) {
        push @steps, 
            sub {
                app->log->debug("subprocess for cell pid $cell");
                # my $sp = shift;
                my $delay = shift;
                sleep(2);
                app->log->debug("end of sleep for cell $cell");
                $delay->pass($cell % 2 ? 'OK' : 'NOK');
            },
            sub {
                my $delay = shift;
                my $result = shift;

                app->log->debug("sending cell $cell from pid $$ - result was $result");
                my $payload = {
                    type => 'ticket',
                    cell => $cell,
                    result => $result
            };
            $ws->send( { json => $payload } );
            $delay->pass;    
        };
    }

    # add final step to notify end of processing
    push @steps, sub {
        my $delay = shift;
        app->log->debug('sending end of process ->websocket');
        $ws->send({json => { type => 'end' } });
        $delay->pass;
    };

    my $delay = Mojo::IOLoop::Delay->new;
    app->log->debug("Starting delay...");
    $delay->steps( @steps );
    app->log->debug("After the delay");

});

您可以使用线程而不是子进程来完成这项工作。创建线程后,您需要一个通过 websocket 更新进度的循环。

如果您要处理在所有情况下都必须完成的关键工作负载(websocket 消失、网络中断等),您应该将其委托给另一个持久存在并通过文件或套接字传达其状态的守护进程.

如果它是非关键工作负载并且您可以轻松地重新启动它,那么这可能是适合您的模板。

# Insert this at module header
# use threads;
# use Thread::Queue;

my $queue  = Thread::Queue->new();
my $worker = threads->create(sub {
  # dummy workload. do your work here
  my $count = 60;
  for (1..$count) {
    sleep 1;
    $queue->enqueue($_/$count);
  }

  # undef to signal end of work
  $queue->enqueue(undef);

  return;
});

# blocking dequeuing ends when retrieving an undef'd value
while(defined(my $item = $queue->dequeue)) {
  # update progress via websocket
  printf("%f %\n", $item);
}

# join thread
$worker->join;

It is not possible to magically make Perl code non-blocking. 这就是为什么您的阻塞操作阻碍了 websocket 响应和事件循环。

单个子进程对此不起作用,因为只有处理请求的原始工作进程才能响应 websocket,子进程只能 return 一次。但是,您可以使用子流程来准备要发送的每个响应。但是,您对子流程的使用不太正确。

传递给子进程的第一个子例程在分支中执行,因此不会阻塞主进程。一旦子进程完成,第二个子例程在父进程中执行,并接收第一个子例程的 return 值。这是您需要发送回复的地方。

除此之外的任何代码都将在子进程启动之前执行,因为这是异步代码,您需要通过回调对逻辑进行排序。您可以使用 promises 简化复杂的排序。

use Mojo::Promise;

$ws->on(json => sub {
    my ($ws, $msg) = @_;
    app->log->debug('Received from websocket:', Dumper($msg));
    unless($msg){
        app->log->debug('Received empty message? WTF?!');
        return;
    }
    my $prompt = $msg->{cmd};
    return unless $prompt;
    app->log->debug(sprintf 'Received: `%s`', $prompt // 'empty??');

    my $promise = Mojo::Promise->new->resolve; # starting point
    # attach follow-up code for each cell, returning a new promise representing the whole chain so far
    for my $cell (1..3) {
        $promise = $promise->then(sub {
            my $promise = Mojo::Promise->new;
            Mojo::IOLoop->subprocess(sub {
                app->log->debug("sending cell $cell");
                sleep(2);
                my $payload = {
                        type => 'ticket',
                        cell => $cell,
                        result => $cell % 2 ? 'OK' : 'NOK'
                };
                return $payload;
            }, sub {
                my ($sp, $err, $payload) = @_;
                return $promise->reject($err) if $err; # indicates subprocess died
                $ws->send( { json => $payload }, sub { $promise->resolve } );
            });

            # here, the subprocess has not been started yet
            # it will be started when this handler returns to the event loop
            # then the second callback will run once the subprocess exits
            return $promise;
        };
    }

    # chain from last promise
    $promise->then(sub {
        app->log->debug('sending end of process ->websocket');
        $ws->send({json => { type => 'end' } });
    })->catch(sub {
        my $err = shift;
        # you can send or log something here to indicate an error occurred in one of the subprocesses
    });
});

其他一些选项,如果合适的话,我可以详细介绍:Mojo::IOLoop::ReadWriteFork which would let you start just one subprocess and continuously receive STDOUT from it (you would need to serialize your payload yourself to send it on STDOUT, like with Mojo::JSON); or a regular subprocess that sends status information back to the parent over an external pub/sub broker that both processes can connect to, like Postgres, Redis, or Mercury(也需要序列化)。

我对您更新后的示例做了一些小改动,使其按预期工作。您可以使用 Subprocess 模块的 progress 功能来确保通过 websocket 从长子进程异步发送正确的数据。

代码现在按我的预期工作,每次子流程进行迭代时,table 状态都会在客户端更新。

源代码的相关部分如下所示:

$ws->on(
    json => sub {
        my ( $ws, $msg ) = @_;
        app->log->debug( 'Received from websocket:', Dumper( $msg ) );
        unless ($msg) {
            app->log->debug('Received empty message? WTF?!');
            return;
        }
        my $prompt = $msg->{cmd};
        return unless $prompt;
        app->log->debug( sprintf 'Received: `%s`', $prompt // '<empty??>' );

        # my $loop = Mojo::IOLoop->singleton;
        my $subprocess = Mojo::IOLoop::Subprocess->new;
        app->log->debug("we are pid $$");
        $subprocess->run(
            sub {
                my $sp = shift;
                for my $cell ( 1 .. 3 ) {
                    app->log->debug(
                        "starting process for cell $cell in pid $$");
                    sleep(2);
                    app->log->debug("sending cell $cell to ws");
                    my $payload = {
                        type   => 'ticket',
                        cell   => $cell,
                        result => $cell % 2 ? 'OK' : 'NOK'
                    };
                    $sp->progress($payload);
                }
            },
            sub {
                my ( $sp, $err, @results ) = @_;

                #$ws->send( { json => $payload } );
                $ws->reply->exception($err) and return if $err;
                app->log->debug('sending end of process ->websocket');
                $ws->send( { json => { type => 'end' } } );
            }
        );

        # Start event loop if necessary
        $subprocess->on(
            progress => sub {
                my ( $subprocess, $payload ) = @_;
                $ws->send( { json => $payload } );
            }
        );
        $subprocess->ioloop->start unless $subprocess->ioloop->is_running;
    }
);