在特定示例中了解 perl 中的异步
Understanding async in perl on specific example
我必须编写一个脚本来并行获取一些 URL 并做一些工作。过去我总是使用 Parallel::ForkManager
来做这样的事情,但现在我想学习一些新东西并尝试使用 AnyEvent
(以及 AnyEvent::HTTP
或 AnyEvent::Curl::Multi
)进行异步编程.. .但我在理解 AnyEvent 和编写应该:
的脚本时遇到问题
- 打开一个文件(每一行都是一个单独的URL)
- (从现在开始并行,但有 f.e 的限制。10 个并发请求)
- 逐行读取文件(我不想将整个文件加载到内存中 - 它可能很大)
- 为此 URL
发出 HTTP 请求
- 阅读回复
- 相应更新MySQL记录
- (下一个文件行)
我已经阅读了很多手册、教程,但我仍然很难理解阻塞代码和非阻塞代码之间的区别。我在 http://perlmaven.com/fetching-several-web-pages-in-parallel-using-anyevent 找到了类似的脚本,Szabo 先生在那里解释了基础知识,但我仍然无法理解如何实现类似的脚本:
...
open my $fh, "<", $file;
while ( my $line = <$fh> )
{
# http request, read response, update MySQL
}
close $fh
...
...并在这种情况下添加并发限制。
我将不胜感激 ;)
更新
听从池上的建议,我尝试了 Net::Curl::Multi
。我对结果非常满意。经过多年使用 Parallel::ForkManager
并发抓取数千个 URLs,Net::Curl::Multi
似乎很棒。
这是我的代码,在文件句柄上有 while
循环。它似乎可以正常工作,但考虑到这是我第一次写这样的东西,我想请更多有经验的 Perl 用户看一下,并告诉我是否有一些潜在的错误,我错过了什么,等等。
另外,如果我可能会问:因为我不完全理解 Net::Curl::Multi
的并发性是如何工作的,请告诉我在使用 MySQL UPDATE 命令(通过 DBI
) 在 RESPONSE
循环内(显然除了更高的服务器负载 - 我希望最终脚本 运行 有大约 50 个并发 N::C::M
工作人员,也许更多)。
#!/usr/bin/perl
use Net::Curl::Easy qw( :constants );
use Net::Curl::Multi qw( );
sub make_request {
my ( $url ) = @_;
my $easy = Net::Curl::Easy->new();
$easy->{url} = $url;
$easy->setopt( CURLOPT_URL, $url );
$easy->setopt( CURLOPT_HEADERDATA, $easy->{head} );
$easy->setopt( CURLOPT_FILE, $easy->{body} );
return $easy;
}
my $maxWorkers = 10;
my $multi = Net::Curl::Multi->new();
my $workers = 0;
my $i = 1;
open my $fh, "<", "urls.txt";
LINE: while ( my $url = <$fh> )
{
chomp( $url );
$url .= "?$i";
print "($i) $url\n";
my $easy = make_request( $url );
$multi->add_handle( $easy );
$workers++;
my $running = 0;
do {
my ($r, $w, $e) = $multi->fdset();
my $timeout = $multi->timeout();
select $r, $w, $e, $timeout / 1000
if $timeout > 0;
$running = $multi->perform();
RESPONSE: while ( my ( $msg, $easy, $result ) = $multi->info_read() ) {
$multi->remove_handle( $easy );
$workers--;
printf( "%s getting %s\n", $easy->getinfo( CURLINFO_RESPONSE_CODE ), $easy->{url} );
}
# dont max CPU while waiting
select( undef, undef, undef, 0.01 );
} while ( $workers == $maxWorkers || ( eof && $running ) );
$i++;
}
close $fh;
Net::Curl 是一个相当不错的库,速度非常快。此外,它还可以处理并行请求!我建议使用它而不是 AnyEvent。
use Net::Curl::Easy qw( :constants );
use Net::Curl::Multi qw( );
sub make_request {
my ( $url ) = @_;
my $easy = Net::Curl::Easy->new();
$easy->{url} = $url;
$easy->setopt( CURLOPT_URL, $url );
$easy->setopt( CURLOPT_HEADERDATA, $easy->{head} );
$easy->setopt( CURLOPT_FILE, $easy->{body} );
return $easy;
}
my $max_running = 10;
my @urls = ( 'http://www.google.com/' );
my $multi = Net::Curl::Multi->new();
my $running = 0;
while (1) {
while ( @urls && $running < $max_running ) {
my $easy = make_request( shift( @urls ) );
$multi->add_handle( $easy );
++$running;
}
last if !$running;
my ( $r, $w, $e ) = $multi->fdset();
my $timeout = $multi->timeout();
select( $r, $w, $e, $timeout / 1000 )
if $timeout > 0;
$running = $multi->perform();
while ( my ( $msg, $easy, $result ) = $multi->info_read() ) {
$multi->remove_handle( $easy );
printf( "%s getting %s\n", $easy->getinfo( CURLINFO_RESPONSE_CODE ), $easy->{url} );
}
}
这正是您想要的,以异步方式,它通过以安全方式包装 Net::Curl
来实现:
#!/usr/bin/env perl
package MyDownloader;
use strict;
use warnings qw(all);
use Moo;
extends 'YADA::Worker';
has '+use_stats'=> (default => sub { 1 });
has '+retry' => (default => sub { 10 });
after init => sub {
my ($self) = @_;
$self->setopt(
encoding => '',
verbose => 1,
);
};
after finish => sub {
my ($self, $result) = @_;
if ($self->has_error) {
print "ERROR: $result\n";
} else {
# do the interesting stuff here
printf "Finished downloading %s: %d bytes\n", $self->final_url, length ${$self->data};
}
};
around has_error => sub {
my $orig = shift;
my $self = shift;
return 1 if $self->$orig(@_);
return 1 if $self->getinfo('response_code') =~ m{^5[0-9]{2}$}x;
};
1;
package main;
use strict;
use warnings qw(all);
use Carp;
use YADA;
my $q = YADA->new(
max => 8,
timeout => 30,
);
open(my $fh, '<', 'file_with_urls_per_line.txt')
or croak "can't open queue: $!";
while (my $url = <$fh>) {
chomp $url;
$q->append(sub {
MyDownloader->new($url)
});
}
close $fh;
$q->wait;
我必须编写一个脚本来并行获取一些 URL 并做一些工作。过去我总是使用 Parallel::ForkManager
来做这样的事情,但现在我想学习一些新东西并尝试使用 AnyEvent
(以及 AnyEvent::HTTP
或 AnyEvent::Curl::Multi
)进行异步编程.. .但我在理解 AnyEvent 和编写应该:
- 打开一个文件(每一行都是一个单独的URL)
- (从现在开始并行,但有 f.e 的限制。10 个并发请求)
- 逐行读取文件(我不想将整个文件加载到内存中 - 它可能很大)
- 为此 URL 发出 HTTP 请求
- 阅读回复
- 相应更新MySQL记录
- (下一个文件行)
我已经阅读了很多手册、教程,但我仍然很难理解阻塞代码和非阻塞代码之间的区别。我在 http://perlmaven.com/fetching-several-web-pages-in-parallel-using-anyevent 找到了类似的脚本,Szabo 先生在那里解释了基础知识,但我仍然无法理解如何实现类似的脚本:
...
open my $fh, "<", $file;
while ( my $line = <$fh> )
{
# http request, read response, update MySQL
}
close $fh
...
...并在这种情况下添加并发限制。
我将不胜感激 ;)
更新
听从池上的建议,我尝试了 Net::Curl::Multi
。我对结果非常满意。经过多年使用 Parallel::ForkManager
并发抓取数千个 URLs,Net::Curl::Multi
似乎很棒。
这是我的代码,在文件句柄上有 while
循环。它似乎可以正常工作,但考虑到这是我第一次写这样的东西,我想请更多有经验的 Perl 用户看一下,并告诉我是否有一些潜在的错误,我错过了什么,等等。
另外,如果我可能会问:因为我不完全理解 Net::Curl::Multi
的并发性是如何工作的,请告诉我在使用 MySQL UPDATE 命令(通过 DBI
) 在 RESPONSE
循环内(显然除了更高的服务器负载 - 我希望最终脚本 运行 有大约 50 个并发 N::C::M
工作人员,也许更多)。
#!/usr/bin/perl
use Net::Curl::Easy qw( :constants );
use Net::Curl::Multi qw( );
sub make_request {
my ( $url ) = @_;
my $easy = Net::Curl::Easy->new();
$easy->{url} = $url;
$easy->setopt( CURLOPT_URL, $url );
$easy->setopt( CURLOPT_HEADERDATA, $easy->{head} );
$easy->setopt( CURLOPT_FILE, $easy->{body} );
return $easy;
}
my $maxWorkers = 10;
my $multi = Net::Curl::Multi->new();
my $workers = 0;
my $i = 1;
open my $fh, "<", "urls.txt";
LINE: while ( my $url = <$fh> )
{
chomp( $url );
$url .= "?$i";
print "($i) $url\n";
my $easy = make_request( $url );
$multi->add_handle( $easy );
$workers++;
my $running = 0;
do {
my ($r, $w, $e) = $multi->fdset();
my $timeout = $multi->timeout();
select $r, $w, $e, $timeout / 1000
if $timeout > 0;
$running = $multi->perform();
RESPONSE: while ( my ( $msg, $easy, $result ) = $multi->info_read() ) {
$multi->remove_handle( $easy );
$workers--;
printf( "%s getting %s\n", $easy->getinfo( CURLINFO_RESPONSE_CODE ), $easy->{url} );
}
# dont max CPU while waiting
select( undef, undef, undef, 0.01 );
} while ( $workers == $maxWorkers || ( eof && $running ) );
$i++;
}
close $fh;
Net::Curl 是一个相当不错的库,速度非常快。此外,它还可以处理并行请求!我建议使用它而不是 AnyEvent。
use Net::Curl::Easy qw( :constants );
use Net::Curl::Multi qw( );
sub make_request {
my ( $url ) = @_;
my $easy = Net::Curl::Easy->new();
$easy->{url} = $url;
$easy->setopt( CURLOPT_URL, $url );
$easy->setopt( CURLOPT_HEADERDATA, $easy->{head} );
$easy->setopt( CURLOPT_FILE, $easy->{body} );
return $easy;
}
my $max_running = 10;
my @urls = ( 'http://www.google.com/' );
my $multi = Net::Curl::Multi->new();
my $running = 0;
while (1) {
while ( @urls && $running < $max_running ) {
my $easy = make_request( shift( @urls ) );
$multi->add_handle( $easy );
++$running;
}
last if !$running;
my ( $r, $w, $e ) = $multi->fdset();
my $timeout = $multi->timeout();
select( $r, $w, $e, $timeout / 1000 )
if $timeout > 0;
$running = $multi->perform();
while ( my ( $msg, $easy, $result ) = $multi->info_read() ) {
$multi->remove_handle( $easy );
printf( "%s getting %s\n", $easy->getinfo( CURLINFO_RESPONSE_CODE ), $easy->{url} );
}
}
这正是您想要的,以异步方式,它通过以安全方式包装 Net::Curl
来实现:
#!/usr/bin/env perl
package MyDownloader;
use strict;
use warnings qw(all);
use Moo;
extends 'YADA::Worker';
has '+use_stats'=> (default => sub { 1 });
has '+retry' => (default => sub { 10 });
after init => sub {
my ($self) = @_;
$self->setopt(
encoding => '',
verbose => 1,
);
};
after finish => sub {
my ($self, $result) = @_;
if ($self->has_error) {
print "ERROR: $result\n";
} else {
# do the interesting stuff here
printf "Finished downloading %s: %d bytes\n", $self->final_url, length ${$self->data};
}
};
around has_error => sub {
my $orig = shift;
my $self = shift;
return 1 if $self->$orig(@_);
return 1 if $self->getinfo('response_code') =~ m{^5[0-9]{2}$}x;
};
1;
package main;
use strict;
use warnings qw(all);
use Carp;
use YADA;
my $q = YADA->new(
max => 8,
timeout => 30,
);
open(my $fh, '<', 'file_with_urls_per_line.txt')
or croak "can't open queue: $!";
while (my $url = <$fh>) {
chomp $url;
$q->append(sub {
MyDownloader->new($url)
});
}
close $fh;
$q->wait;