Mojolicous:限制 Promises / IOLoop->subprocess 的数量

Mojolicous: Limiting number of Promises / IOLoop->subprocess

我正在使用 Mojolicious 非阻塞方法(Promises)从外部系统请求数据。 1)我想立即通知用户进程已经开始; 2) 我想扩展这个程序。

下面的代码适用于一小部分数字(几百个),数字更多,我得到一个错误[error] Can't create pipe: Too many open files at /path/lib/perl5/Mojo/IOLoop.pm line 156. 问题1)我如何限制我生成的 Promises 数量(下面代码中的 map):

#!/usr/bin/env perl

use Mojolicious::Lite;
use Mojolicious::Plugin::TtRenderer;

sub isPrime
{
    my ($n) = @_;
    my $e = sqrt($n);
    for (my $i=2; $i<$e; $i++) {
        return 0 if $n%$i==0;
    }
    return 1;
}

sub makeApromise
{
    my ($number) = @_;

    my $promise = Mojo::Promise->new;
    Mojo::IOLoop->subprocess(
    sub {  # first callback is executed in subprocess
        my %response;
        # Simulate a long computational process
        $response{'number'}  = $number;
        $response{'isPrime'} = isPrime($number);
        return \%response;
    },
        sub {  # second callback resolves promise with subprocess result
            my ($self, $err, @result) = @_;
            return $promise->reject($err) if $err;
            $promise->resolve(@result);
        },
    );
    return $promise;
}

plugin 'tt_renderer'; # automatically render *.html.tt templates

any '/' => sub {
    my ($self) = @_;
    my $lines = $self->param( 'textarea' );

    if ($lines) {
    my @numbers;
    foreach my $number (split(/\r?\n/, $lines)) {
        push(@numbers, $number) if $number =~ /^\d+$/;
    }
    if (@numbers) {
        ####################################
        ### This is the problem below... ###
        my @promises = map { makeApromise($_) } @numbers;
        ####################################
        # MojoPromise Wait
        Mojo::Promise->all(@promises)
        ->then(sub {
            my @values = map { $_->[0] } @_;
            foreach my $response (@values) {
            #print STDERR $response->{'number'}, " => ", $response->{'isPrime'}, "\n";
            # Prepare email...
            }
            # Send an email...
               })
        #->wait # Don't wait? I want to tell the user to wait for an email as quickly as possible...
        if @promises;
    }
    $self->stash(done => "1",);
    }
    $self->render(template => 'index', format => 'html', handler => 'tt');
};

app->start;
__DATA__

@@ index.html.tt
<!DOCTYPE html>
<html lang="en">
  <head>
    <title>Make A Promise</title>
  </head>
  <body>
    [% IF done %]
    <h3>Thank you! You will receive an email shortly with the results.</h3>
    [% ELSE %]
    <h3>Enter numbers...</h3>
    <form role="form" action="/" method="post">
      <textarea name="textarea" rows="5" autofocus required></textarea>
      <button type="submit">Submit</button>
    </form>
    [% END %]
  </body>
</html>

我注释掉了wait;但是,代码似乎仍在阻塞。 问题2) 如何立即通知用户进程已经启动? (即当我 stash done 变量时)

问题不在于承诺的数量,而在于子流程的数量。限制这种情况的一种方法是简单地限制您在程序逻辑中一次创建的数量。与其在地图中一次生成它们,不如设置一个限制并从@numbers 中检索那么多(可能使用 splice)并生成这些子进程;创建一个等待它们的 ->all 承诺,然后将 ->then 附加到该承诺以检索下一个数字块,依此类推。

另一种选择是使用 Future::Utils fmap_concat which can take care of the rate-limiting code by have you provide a number of the maximum outstanding Futures. Your promise-returning function can apply Mojo::Promise::Role::Futurify 链接以下 Future 以这种方式使用。

#!/usr/bin/env perl

use Mojolicious::Lite;
use Mojo::File 'path';
use Mojo::IOLoop;
use Mojo::Promise;
use Future::Utils 'fmap_concat';

get '/' => sub {
  my $c = shift;
  my $count = $c->param('count') // 0;
  my @numbers = 1..$count;

  if (@numbers) {
    my $result_f = fmap_concat {
      my $number = shift;
      my $p = Mojo::Promise->new;
      Mojo::IOLoop->subprocess(sub {
        sleep 2;
        return $number+1;
      }, sub {
        my ($subprocess, $err, @result) = @_;
        return $p->reject($err) if $err;
        $p->resolve(@result);
      });
      return $p->with_roles('Mojo::Promise::Role::Futurify')->futurify;
    } foreach => \@numbers, concurrent => 20;

    $result_f->on_done(sub {
      my @values = @_;
      foreach my $response (@values) {
        $c->app->log->info($response);
      }
    })->on_fail(sub {
      my $error = shift;
      $c->app->log->fatal($error);
    })->retain;

    $c->stash(done => 1);
  }
  $c->render(text => "Processing $count numbers\n");
};

app->start;

至于 wait 方法,当事件循环已经 运行ning 时,它什么都不做,如果您在 Mojolicious 守护程序中启动应用程序(而不是不支持异步响应的 PSGI 或 CGI 服务器)。在设置子流程后,回调之外的 ->stash 和 ->render 调用将立即 运行。然后响应处理程序将完成,事件循环将再次获得控制权,一旦承诺解决,它将触发适当的 ->then 回调。渲染不应该等待子进程设置之外的任何事情;既然你说可能有数百个,那可能就是你正在经历的放缓。确保您使用的是 Mojolicious 7.86 或更新版本,因为 Subprocess 已更改,以便在事件循环的下一个滴答声(响应处理程序完成后)之前不会发生分叉。

我还会注意到,子流程并不是真正为此而设计的;它们被设计用于执行缓慢的代码,这些代码仍然 returns 在响应中对浏览器的最终结果(并且 Mojolicious::Plugin::Subprocess is nice for this use case). One problem I can see is that if you restart the application, any still pending subprocesses will just be ignored. For jobs that you want to set off and forget, you might consider a job queue like Minion 与 Mojolicious 应用程序有很好的集成,并且 运行s 通过单独的工作人员过程。