perl Coro 共享数组
perl Coro shared array
我在编程方面有点新手,现在对一个简单的任务有点困惑(相当多)——在 coro 协程之间共享一个数组。
类似这样的事情(不工作,只需要 40 秒即可完成)...
my @array = (1..1000);
$|=0;
sub start_thread($) {
my $url = shift;
return async {
print "starting $array[0]\n";
shift @array;
sleep(2);
};
}
sub main{
start_thread $_ for (1..20);
EV::loop;
}
main;
我读过 IPC::Sharelite 和 IPC::Share 并尝试了一些东西,但没有任何效果...
如果有人可以清楚地指出我好吗?
Coro 是一个协作式多任务处理系统。这意味着一次只有一个任务可以 运行,你可以通过使用事件系统(例如 $cv->recv
)而不是阻塞调用(例如 sleep
).
use AE qw( );
use Coro qw( async );
sub ae_sleep {
my ($secs) = @_;
my $cv = AE::cv();
my $guard = AE::timer($secs, 0, $cv);
$cv->recv();
}
sub worker {
my ($job) = @_;
my $id = sprintf("Thread %d: Job %s", $Coro::current, $job);
print("$id: Running...\n");
ae_sleep(2);
print("$id: done.\n");
}
{
my @array = 1..60;
my @threads;
for (1..20) {
push @threads, async {
while (defined(my $job = shift(@array))) {
worker($job);
}
};
}
$_->join() for @threads;
}
如果您使用 Coro::Channel 而不是数组,您可以在启动工作程序后将元素添加到队列中。
use AE qw( );
use Coro qw( async );
use Coro::Channel qw( );
sub ae_sleep { ... } # Same as above
sub worker { ... } # Same as above
{
my $q = Coro::Channel->new();
my @threads;
for (1..20) {
push @threads, async {
while (defined(my $job = $q->get())) {
worker($job);
}
};
}
$q->put($_) for 1..60;
$q->shutdown();
$_->join() for @threads;
}
如果你想使用真正的线程,很简单:
use threads;
use Thread::Queue qw( ); # 3.01+
sub worker {
my ($job) = @_;
my $id = sprintf("Thread %d: Job %s", threads->tid, $job);
print("$id: Running...\n");
sleep(2);
print("$id: done.\n");
}
{
my $q = Thread::Queue->new();
my @threads;
for (1..20) {
push @threads, async {
while (defined(my $job = $q->dequeue())) {
worker($job);
}
};
}
$q->enqueue($_) for 1..60;
$q->end();
$_->join() for @threads;
}
我在编程方面有点新手,现在对一个简单的任务有点困惑(相当多)——在 coro 协程之间共享一个数组。
类似这样的事情(不工作,只需要 40 秒即可完成)...
my @array = (1..1000);
$|=0;
sub start_thread($) {
my $url = shift;
return async {
print "starting $array[0]\n";
shift @array;
sleep(2);
};
}
sub main{
start_thread $_ for (1..20);
EV::loop;
}
main;
我读过 IPC::Sharelite 和 IPC::Share 并尝试了一些东西,但没有任何效果... 如果有人可以清楚地指出我好吗?
Coro 是一个协作式多任务处理系统。这意味着一次只有一个任务可以 运行,你可以通过使用事件系统(例如 $cv->recv
)而不是阻塞调用(例如 sleep
).
use AE qw( );
use Coro qw( async );
sub ae_sleep {
my ($secs) = @_;
my $cv = AE::cv();
my $guard = AE::timer($secs, 0, $cv);
$cv->recv();
}
sub worker {
my ($job) = @_;
my $id = sprintf("Thread %d: Job %s", $Coro::current, $job);
print("$id: Running...\n");
ae_sleep(2);
print("$id: done.\n");
}
{
my @array = 1..60;
my @threads;
for (1..20) {
push @threads, async {
while (defined(my $job = shift(@array))) {
worker($job);
}
};
}
$_->join() for @threads;
}
如果您使用 Coro::Channel 而不是数组,您可以在启动工作程序后将元素添加到队列中。
use AE qw( );
use Coro qw( async );
use Coro::Channel qw( );
sub ae_sleep { ... } # Same as above
sub worker { ... } # Same as above
{
my $q = Coro::Channel->new();
my @threads;
for (1..20) {
push @threads, async {
while (defined(my $job = $q->get())) {
worker($job);
}
};
}
$q->put($_) for 1..60;
$q->shutdown();
$_->join() for @threads;
}
如果你想使用真正的线程,很简单:
use threads;
use Thread::Queue qw( ); # 3.01+
sub worker {
my ($job) = @_;
my $id = sprintf("Thread %d: Job %s", threads->tid, $job);
print("$id: Running...\n");
sleep(2);
print("$id: done.\n");
}
{
my $q = Thread::Queue->new();
my @threads;
for (1..20) {
push @threads, async {
while (defined(my $job = $q->dequeue())) {
worker($job);
}
};
}
$q->enqueue($_) for 1..60;
$q->end();
$_->join() for @threads;
}