perl Coro 共享数组

perl Coro shared array

我在编程方面有点新手,现在对一个简单的任务有点困惑(相当多)——在 coro 协程之间共享一个数组。

类似这样的事情(不工作,只需要 40 秒即可完成)...

my @array = (1..1000);
$|=0;

sub start_thread($) {
    my $url = shift;
    return async {
        print "starting  $array[0]\n";
        shift @array;
        sleep(2);
      };
}

sub main{
  start_thread $_ for (1..20);
  EV::loop;
}

main;

我读过 IPC::Sharelite 和 IPC::Share 并尝试了一些东西,但没有任何效果... 如果有人可以清楚地指出我好吗?

Coro 是一个协作式多任务处理系统。这意味着一次只有一个任务可以 运行,你可以通过使用事件系统(例如 $cv->recv)而不是阻塞调用(例如 sleep).

use AE   qw( );
use Coro qw( async );

sub ae_sleep {
   my ($secs) = @_;
   my $cv = AE::cv();
   my $guard = AE::timer($secs, 0, $cv);
   $cv->recv();
}

sub worker {
   my ($job) = @_;
   my $id = sprintf("Thread %d: Job %s", $Coro::current, $job);
   print("$id: Running...\n");
   ae_sleep(2);
   print("$id: done.\n");
}

{
   my @array = 1..60;

   my @threads;
   for (1..20) {
      push @threads, async {
         while (defined(my $job = shift(@array))) {
            worker($job);
         }
      };
   }

   $_->join() for @threads;
}

如果您使用 Coro::Channel 而不是数组,您可以在启动工作程序后将元素添加到队列中。

use AE            qw( );
use Coro          qw( async );
use Coro::Channel qw( );

sub ae_sleep { ... }  # Same as above

sub worker { ... }  # Same as above

{
   my $q = Coro::Channel->new();

   my @threads;
   for (1..20) {
      push @threads, async {
         while (defined(my $job = $q->get())) {
            worker($job);
         }
      };
   }

   $q->put($_) for 1..60;

   $q->shutdown();
   $_->join() for @threads;
}

如果你想使用真正的线程,很简单:

use threads;
use Thread::Queue qw( );  # 3.01+

sub worker {
   my ($job) = @_;
   my $id = sprintf("Thread %d: Job %s", threads->tid, $job);
   print("$id: Running...\n");
   sleep(2);
   print("$id: done.\n");
}

{
   my $q = Thread::Queue->new();

   my @threads;
   for (1..20) {
      push @threads, async {
         while (defined(my $job = $q->dequeue())) {
            worker($job);
         }
      };
   }

   $q->enqueue($_) for 1..60;

   $q->end();
   $_->join() for @threads;
}