关闭连接到 Mojo websocket 的 Mojo::IOLoop 重复事件

Shutting down a Mojo::IOLoop recurring event connected to a Mojo websocket

我正在玩弄 Mojolicious 和 websockets。我想将服务器上的多个外部命令的输出发送到网页。我在连接和接收消息方面没有问题,但我还想将消息发送回服务器以停止外部命令,同时让其他人继续将消息发送回客户端。我也想在退出后停止检查外部命令。

外部命令只是一行,每隔几秒吐出一个整数。我有两个 websocket,分别在 div 中显示数字。单击任一停止按钮都会发送消息,但这就是我需要弄清楚如何关闭该 websocket(并且仅关闭该 websocket)并关闭外部命令的地方。

当我连接websocket时,我运行外部命令并设置一个Mojo::IOLoop->recurring来检查是否有输出。

当我想停止时,我想我应该调用 Mojo::IOLoop->remove($id),但这似乎并没有完全删除它,而且我收到类似 Mojo::Reactor::Poll: Timer failed: Can't call method "is_websocket" on an undefined value.

的错误消息

如果我在控制器对象上调用 finish 来关闭 websocket,它似乎会停止一切。

我有整个 Mojolicious::Lite app as a gist,但这是我

use feature qw(signatures);
no warnings qw(experimental::signatures);
## other boilerplate redacted

websocket '/find' => sub ( $c ) {
    state $loop = Mojo::IOLoop->singleton;

    app->log->debug( "websocket for find" );
    $c->inactivity_timeout( 50 );

    my $id;
    $c->on( message => sub ( $ws, $message ) {
        my $json = decode_json( $message );
        my $command = $json->{c};
        my $name    = $json->{n};

        app->log->debug( "Got $command command for $name" );
        if( $command eq "start" ) {
            $id = run_command( $ws );
            app->log->debug( "run_command for $name returned [$id]" );
            }
        elsif( $command eq "stop" ) {
            app->log->debug( "stopping loop for $name [$id]" );
            # XXX What should I do here?
            # $ws->finish;
            # $loop->remove( $id );
            }
        elsif( $command eq "open" ) {
            app->log->debug( "opening websocket for $name" );
            }
        }
        );
     $c->on(
        finish => sub ( $c, $code ) {
            app->log->debug("WebSocket closed with status $code");
            }
        );
    };

app->start;

sub run_command ( $ws ) {
    app->log->debug( "In run_command: $ws" );
    open my $fh, "$^X -le '$|++; while(1) { print int rand(100); sleep 3 }' |";
    $fh->autoflush;

    my $id;
    $id = Mojo::IOLoop->recurring( 1 => sub ($loop) {
        my $m = <$fh>;
        unless( defined $m ) {
            app->log->debug( "Closing down recurring loop from the inside [$id]" );
            # XXX: what should I do here?
            close $fh;
            return;
            };
        chomp $m;
        app->log->debug( "Input [$m] for [$id] from $fh" );
        $ws->send( encode_json( { 'm' => $m } ) );
        });

    return $id;
    }

其他可能从这个答案中受益的问题:

我认为您阻塞了事件循环,因为您每秒循环调用一次并且 my $m = <$fh>; 等待大约 2-3 秒的结果。所以你阻止了事件循环。 我认为是这样,因为当我 运行 你的应用程序事件 finish 不是调用不活动超时,而是调用事件 recurrentfinish 事件必须始终调用不活动超时。

我认为你的代码必须在单独的进程中以避免阻塞事件循环。

尝试使用this模块在单独的进程中执行。 我write小例子。

我试了一下。 让我觉得我不应该自己轮询或处理文件句柄细节。我还是不知道挂在哪里。

相反,我使用 Mojo::Reactorio 来设置要监视的文件句柄:

sub run_command ( $ws ) {
    my $pid = open my $fh, "$^X -le '$|++; print $$; while(1) { print int rand(100); sleep 3 }' |";
    $fh->autoflush;

    my $reactor = Mojo::IOLoop->singleton->reactor->io(
        $fh => sub ($reactor, $writeable) {
            my $m = <$fh>;
            chomp $m;
            $ws->send( encode_json( { 'm' => $m } ) );
            }
        );

    return ( $fh, $pid );
    }

当我完成该命令后,我可以取消监视该文件句柄并终止该进程。我完成了 websocket:

    elsif( $command eq "stop" ) {
        $loop->reactor->watch( $fh, 0, 0 );
        kill 'KILL', $pid or app->log->debug( "Could not kill $pid: $!" );
        $ws->finish;
        }

我仍然不知道为什么 remove($fh) 不起作用。我想我这样做会泄露一些 IOLoop 的东西。