Perl:带有 Coro 和 AnyEvent 的多线程服务器

Perl: multithreaded server with Coro and AnyEvent

我是 Perl 的新手,因此出于教育目的,我正在使用 AnyEvent 和 Coro 开发多线程服务器。客户端向服务器发送目录路径列表和带有这些目录列表的服务器响应。

我正在使用 tcp_server 和 AnyEvent::Handle 进行连接处理,对于每个客户端,我希望服务器检查线程池(实际上是 coros 池)以获取可用的 coro 来处理请求。处理请求完成后,我希望 coro 等待另一个客户端而不是完成。

然而,似乎在 handle_request sub 的末尾,coro 被破坏并且不再可用。

#!/usr/bin/perl

use strict;
use v5.18;

use AnyEvent;                           
use AnyEvent::Socket qw(tcp_server);    
use AnyEvent::Handle;                  

use Coro;

use Class::Struct;

print("Server is running...\n");

# dirs function

sub print_dirs {

    my $dir_list = $_[0];
    my @dirs = split(" ", $dir_list);
    my $result = "";

    for my $dir (@dirs) {
        if (opendir my $dirent, $dir) {
            my @files = readdir $dirent;
            closedir $dirent;
            $result = $result . "\nContents of $dir:\r\n" . join("\r\n", @files) . "\r\n";
        } else {
            $result = $result . "Failed to open $dir: $!\r\n";
        }
    }

    return $result;
}

# thread struct
struct clt_thread => {
    id => '$',
    thread => '$',
    is_busy => '$',
    args => '$',
    client => '$',
    clt_key => '$'
};

my $threads_num = 16;
my $thread_id = 0;
my @pool = ();

# handling request

my $cv = AE::cv;
my %client = ();

sub handle_request {

        my $thread_id = shift;
        my $thread;
        foreach my $thr (@pool) {
            if ($thr->id == $thread_id) { $thread = $thr; }
        }
        my $self = $thread->client;
        my $client_key = $thread->clt_key;
        my $dir_list = $thread->args;
        if ($thread->client != '') {

            say "Directories read: " . $dir_list . "\n";
            my @clients = keys %client;

            for my $key (grep {$_ ne $client_key} @clients) {
                my $response = print_dirs($dir_list);
                $client{$key}->push_write("$response");
                $self->push_shutdown;
                delete $client{$client_key};
                delete $client{$self};
            }

        }
        $thread->is_busy(0);
        Coro::cede();
}

# threads creation

for my $i (0..$threads_num) {
    my $coro = new Coro(\&handle_request, $thread_id);
    my $thread = clt_thread->new(id => $thread_id, thread => $coro, is_busy => 0, args => '', client => '', clt_key => '');
    push @pool, $thread;
    $thread_id = $thread_id+1;
}

# tcp server creation - main part

tcp_server '127.0.0.1', 8015, sub {

    my ($fh, $host, $port) = @_;
    my $client_key = "$host:$port";

    my $hdl = AnyEvent::Handle->new(
            fh => $fh,
            poll => 'r',
            on_read => sub {
                    my ($self) = @_;

                    foreach my $thr (@pool) {
                        if (!($thr->is_busy)) {
                        $thr->client($self);
                        $thr->args($self->rbuf); 
                        $thr->clt_key($client_key);
                        $thr->is_busy(1);
                        $thr->thread->ready();
                        return;
                        }   
                    }
            },
            on_error => sub {
                say "Something went wrong: $!\n";
            },
    );

    $client{$client_key} = $hdl;
    $client{$hdl} = $hdl;
};

$cv->recv;

我已经尝试在 handle_request 中使用无限循环,但这样一来一切都停止了。你知道如何解决这个问题吗?我想使用 Coro::AnyEvent 将协程集成到事件循环中可能是解决方案。它对我的情况有帮助吗?

感谢您的帮助。

线程在handle_request退出时退出,所以你想将handle_request的主体包裹在一个无限循环中。

您还想使用 Coro::schedule; 而不是 Coro::cede; 来等待 ->ready 在继续之前被再次调用。

handle_request 中的第一个循环可以减少到 my $thread = $pool[$thread_id];

未经测试的修正:

sub handle_request {
   my ($thread_id) = @_;
   my $thread = $pool[$thread_id];

   while (1) {
      my $self       = $thread->client;
      my $client_key = $thread->clt_key;
      my $dir_list   = $thread->args;

      ...

      $thread->is_busy(0);
      Coro::schedule();
   }
}

也就是说,以下是我将使用的方法:

use Coro;
use Coro::Channel;

use constant NUM_WORKERS => 16;

sub worker {
   my ($job) = @_;

   my $self       = $job->client;
   my $client_key = $job->clt_key;
   my $dir_list   = $job->args;

   ...
}

{
   my $q = Coro::Channel->new();

   my @threads =
      map {
         async {
            while ( my $job = $q->get() ) {
               eval { worker($job); 1 }
                  or warn $@;
            }
         }
      }
         1..NUM_WORKERS;

   ...
   on_read => sub {
      my ($self) = @_;
      $q->put({
         client  => $self,
         clt_key => $client_key,
         args    => $self->rbuf,
      });
   }
   ...

   $cv->recv;
   $q->shutdown;
   $_->join for @threads;
}

这与我在实际线程中使用的方法相同(使用 Thread::Queue 而不是 Coro::Channel)。