在特定示例中了解 perl 中的异步

Understanding async in perl on specific example

我必须编写一个脚本来并行获取一些 URL 并做一些工作。过去我总是使用 Parallel::ForkManager 来做这样的事情,但现在我想学习一些新东西并尝试使用 AnyEvent(以及 AnyEvent::HTTPAnyEvent::Curl::Multi)进行异步编程.. .但我在理解 AnyEvent 和编写应该:

的脚本时遇到问题

我已经阅读了很多手册、教程,但我仍然很难理解阻塞代码和非阻塞代码之间的区别。我在 http://perlmaven.com/fetching-several-web-pages-in-parallel-using-anyevent 找到了类似的脚本,Szabo 先生在那里解释了基础知识,但我仍然无法理解如何实现类似的脚本:

...
open my $fh, "<", $file;
while ( my $line = <$fh> )
{
# http request, read response, update MySQL
}
close $fh
...

...并在这种情况下添加并发限制。

我将不胜感激 ;)

更新

听从池上的建议,我尝试了 Net::Curl::Multi。我对结果非常满意。经过多年使用 Parallel::ForkManager 并发抓取数千个 URLs,Net::Curl::Multi 似乎很棒。 这是我的代码,在文件句柄上有 while 循环。它似乎可以正常工作,但考虑到这是我第一次写这样的东西,我想请更多有经验的 Perl 用户看一下,并告诉我是否有一些潜在的错误,我错过了什么,等等。 另外,如果我可能会问:因为我不完全理解 Net::Curl::Multi 的并发性是如何工作的,请告诉我在使用 MySQL UPDATE 命令(通过 DBI ) 在 RESPONSE 循环内(显然除了更高的服务器负载 - 我希望最终脚本 运行 有大约 50 个并发 N::C::M 工作人员,也许更多)。

#!/usr/bin/perl

use Net::Curl::Easy  qw( :constants );
use Net::Curl::Multi qw( );

sub make_request {
    my ( $url ) = @_;
    my $easy = Net::Curl::Easy->new();
    $easy->{url} = $url;
    $easy->setopt( CURLOPT_URL,        $url );
    $easy->setopt( CURLOPT_HEADERDATA, $easy->{head} );
    $easy->setopt( CURLOPT_FILE,       $easy->{body} );
    return $easy;
}

my $maxWorkers = 10;

my $multi = Net::Curl::Multi->new();
my $workers = 0;

my $i = 1;
open my $fh, "<", "urls.txt";
LINE: while ( my $url = <$fh> )
{
    chomp( $url );
    $url .= "?$i";
    print "($i) $url\n";
    my $easy = make_request( $url );
    $multi->add_handle( $easy );
    $workers++;

    my $running = 0;
    do {
        my ($r, $w, $e) = $multi->fdset();
        my $timeout = $multi->timeout();
        select $r, $w, $e, $timeout / 1000
        if $timeout > 0;

        $running = $multi->perform();
        RESPONSE: while ( my ( $msg, $easy, $result ) = $multi->info_read() ) {
            $multi->remove_handle( $easy );
            $workers--;
            printf( "%s getting %s\n", $easy->getinfo( CURLINFO_RESPONSE_CODE ), $easy->{url} );
        }

        # dont max CPU while waiting
        select( undef, undef, undef, 0.01 );
    } while ( $workers == $maxWorkers || ( eof && $running ) );
    $i++;
}
close $fh;

Net::Curl 是一个相当不错的库,速度非常快。此外,它还可以处理并行请求!我建议使用它而不是 AnyEvent。

use Net::Curl::Easy  qw( :constants );
use Net::Curl::Multi qw( );

sub make_request {
    my ( $url ) = @_;
    my $easy = Net::Curl::Easy->new();
    $easy->{url} = $url;
    $easy->setopt( CURLOPT_URL,        $url );
    $easy->setopt( CURLOPT_HEADERDATA, $easy->{head} );
    $easy->setopt( CURLOPT_FILE,       $easy->{body} );
    return $easy;
}

my $max_running = 10;
my @urls = ( 'http://www.google.com/' );

my $multi = Net::Curl::Multi->new();
my $running = 0;
while (1) {
    while ( @urls && $running < $max_running ) {
       my $easy = make_request( shift( @urls ) );
       $multi->add_handle( $easy );
       ++$running;
    }

    last if !$running;

    my ( $r, $w, $e ) = $multi->fdset();
    my $timeout = $multi->timeout();
    select( $r, $w, $e, $timeout / 1000 )
        if $timeout > 0;

    $running = $multi->perform();
    while ( my ( $msg, $easy, $result ) = $multi->info_read() ) {
        $multi->remove_handle( $easy );
        printf( "%s getting %s\n", $easy->getinfo( CURLINFO_RESPONSE_CODE ), $easy->{url} );
    }
}

这正是您想要的,以异步方式,它通过以安全方式包装 Net::Curl 来实现:

#!/usr/bin/env perl

package MyDownloader;
use strict;
use warnings qw(all);

use Moo;

extends 'YADA::Worker';

has '+use_stats'=> (default => sub { 1 });
has '+retry'    => (default => sub { 10 });

after init => sub {
    my ($self) = @_;

    $self->setopt(
        encoding            => '',
        verbose             => 1,
    );
};

after finish => sub {
    my ($self, $result) = @_;

    if ($self->has_error) {
        print "ERROR: $result\n";
    } else {
        # do the interesting stuff here
        printf "Finished downloading %s: %d bytes\n", $self->final_url, length ${$self->data};
    }
};

around has_error => sub {
    my $orig = shift;
    my $self = shift;

    return 1 if $self->$orig(@_);
    return 1 if $self->getinfo('response_code') =~ m{^5[0-9]{2}$}x;
};

1;

package main;
use strict;
use warnings qw(all);

use Carp;

use YADA;

my $q = YADA->new(
    max     => 8,
    timeout => 30,
);

open(my $fh, '<', 'file_with_urls_per_line.txt')
    or croak "can't open queue: $!";
while (my $url = <$fh>) {
    chomp $url;

    $q->append(sub {
        MyDownloader->new($url)
    });
}
close $fh;
$q->wait;