如何在AnyEvent::HTTP中等待http_request完成?
How can I wait for the completion of http_request in AnyEvent::HTTP?
程序从数据库中取作业执行AnyEvent::HTTP::http_request,等待所有请求完成后如何正确退出程序?
#!/usr/bin/perl
use strict;
use AnyEvent;
use AnyEvent::DBI;
use AnyEvent::HTTP;
my @queue;
my $added = 0;
my $finished = 0;
my $uid = int(rand(900000)) + 100000;
my ($run, $jobs, $log, $quit);
my $db = AnyEvent::DBI->new('DBI:mysql:dbname=;host=');
my $cv = AnyEvent->condvar;
$run = AnyEvent->timer(
after => 5,
interval => 0.1,
cb => sub {
if ($#queue != -1 && $added - $finished < 300) {
my $job = shift @queue;
my $r; $r = http_request(
GET => $job->{url},
sub {
undef $r;
my ($body, $header) = @_;
...
$finished++;
}
);
$added++;
}
}
);
$jobs = AnyEvent->timer(
after => 0.1,
interval => 5,
cb => sub {
if ($#queue < 1000) {
$db->exec(q{UPDATE `jobs` SET `lock` = ? WHERE `lock` = 0 ORDER BY `time` ASC LIMIT 1000}, $uid, sub {
$db->exec(q{SELECT * FROM `jobs` WHERE `lock` = ?}, $uid, sub {
my ($db, $rows, $rv) = @_;
push @queue, @$rows;
$db->exec(q{UPDATE `jobs` SET `lock` = 1 WHERE `lock` = ?}, $uid, sub { });
});
});
}
}
);
$log = AnyEvent->timer(
after => 5,
interval => 3,
cb => sub {
printf "Queue: %-6d Added: %-6d During: %-6d Total: %-6d\n", $#queue, $added, $added-$finished, $finished;
}
);
$quit = AnyEvent->timer(
after => 1,
interval => 10,
cb => sub {
if (-f 'stop') {
print "Exit\n";
$cv->send;
}
}
);
my $result = $cv->recv;
也许您知道排队作业并执行它们的最佳方式,然后在 AnyEvent 中显示您的模板 + AnyEvent::HTTP,我使用 AnyEvent 计时器,它更好更快?
新版本:
#!/usr/bin/perl
use strict;
use AnyEvent;
use AnyEvent::DBI;
use AnyEvent::HTTP;
my $pid = int(rand(900000)) + 100000;
my $run = 1;
my $capacity = 300;
my $added = 0;
my $finished = 0;
my $cv = AnyEvent->condvar;
my $dbh = AnyEvent::DBI->new('DBI:mysql:dbname=;host=');
my $log = AnyEvent->timer(
after => 1,
interval => 3,
cb => sub {
printf "Added: %-6d Finished: %-6d Active: %-6d\n", $added, $finished, $AnyEvent::HTTP::ACTIVE if $finished;
if ($run == 0 && $AnyEvent::HTTP::ACTIVE == 0) {
$cv->send;
}
}
);
while (! -f 'stop') {
my $done = AnyEvent->condvar;
$dbh->exec(q{UPDATE `jobs` SET `lock` = ? WHERE `lock` = 0 ORDER BY `time` ASC LIMIT ?}, $pid, $capacity, sub {
$dbh->exec(q{SELECT * FROM `jobs` WHERE `lock` = ?}, $pid, sub {
my ($dbh, $rows, $rv) = @_;
$dbh->exec(q{UPDATE `jobs` SET `lock` = 1 WHERE `lock` = ?}, $pid, sub {});
$done->send($rows);
});
});
my $jobs = $done->recv;
my $done = AnyEvent->condvar;
foreach my $job (@$jobs) {
my $content;
my $r; $r = http_request(
GET => $job->{url},
sub {
undef $r;
my ($body, $header) = @_;
...
$dbh->exec(q{UPDATE `jobs` SET `lock` = 0, `time` = CURRENT_TIMESTAMP WHERE `id` = ?}, $job->{id}, sub {});
$done->send if $AnyEvent::HTTP::ACTIVE < $capacity;
$finished++;
}
);
$added++;
}
$done->recv;
}
$run = 0;
$cv->recv;
你基本上需要一个三态系统:
- 运行(已接受新工作)。
- 正在退出(未接受新作业;正在等待现有作业完成)。
- 退出(没有工作 运行)。
你的两个版本都失败了,因为它们只有两个状态:
- 运行(接受新工作)
- 退出(作业仍可运行!!!)
这是一个解决方案:
#!/usr/bin/perl
use strict;
use warnings;
use AE qw( );
use AnyEvent qw( );
use AnyEvent::DBI qw( );
use AnyEvent::HTTP qw( );
use Scalar::Util qw( weaken );
use Sys::Hostname qw( hostname );
my $capacity = 300;
my $exit_request = AE::cv();
my $done = AE::cv();
my $exiting = 0;
my $grabbing = 0;
my $added = 0;
my $finished = 0;
my $uid = join('.', hostname(), $$, int(rand(65536)));
my $db = AnyEvent::DBI->new('DBI:mysql:dbname=;host=');
sub worker {
my ($job, $cb) = @_;
http_request(
GET => $job->{url},
sub {
my ($body, $header) = @_;
...
$cb->();
},
);
}
sub manager {
my $active = $added - $finshed;
if ($exiting) {
$done->send() if !$active;
return;
}
my $avail_slots = $capacity - $active;
return if !$avail_slots;
return if $grabbing;
$grabbing = 1;
$db->exec(qq{UPDATE `jobs` SET `lock` = ? WHERE `status` = 'queued' AND `lock` = 0 ORDER BY `time` ASC LIMIT $avail_slots}, $uid, sub {
$db->exec(q{SELECT * FROM `jobs` WHERE `status` = 'queued' AND `lock` = ?}, $uid, sub {
my (undef, $jobs, undef) = @_;
$db->exec(q{UPDATE `jobs` SET `status = 'wip' WHERE `lock` = ?}, $uid, sub {
$grabbing = 0;
for my $job (@$jobs) {
++$added;
worker($job, sub {
++$finished;
$db->exec(q{UPDATE `jobs` SET `status = 'done', `lock` = 0, `time` = CURRENT_TIMESTAMP WHERE `id` = ?}, $job->{id}, sub { });
manager();
});
});
});
});
});
};
my $db_poll_timer = AE::timer(5, 0.5, \&manager);
my $exit_check_timer = AE::timer(0, 2, sub {
$exit_request->send() if -f 'stop';
});
my $log_timer = AE::timer(1, 3, sub {
printf("Added: %-6d Finished: %-6d Active: %-6d\n",
$added, $finished, $added-$finished);
});
$exit_request->recv();
print("Exiting...\n");
undef $exit_check_timer;
$exiting = 1;
$done->recv();
print("Finished.\n");
undef $db_poll_timer;
undef $log_timer;
特点:
- 删除了
@queue
以避免窃取其他任务的工作。
- 我把工人代码和工人管理代码分开了。
- 我删除了工人作为
http_request
的所有依赖项。
- 通过基于计时器的轮询支持长期 运行 工作人员(与您的第一个版本一样,但不同于您的第二个版本)。
- 使用
$grabbing
. 修复了求职代码中的竞争条件(存在于您的第一个版本中,但不存在于您的第二个版本中)
我超出了范围:
- 使用了 AE 中的快捷方式。
- 我使用了更可靠的锁值。它现在是一个字符串,甚至可以识别持有锁的机器和进程。
- 我向作业 table (
status
) 添加了一个字段来指示作业的状态,而不是重复使用 lock
字段。
未来的工作:
- 使用 AnyEvent::Filesys::Notify 而不是计时器进行文件检查?
程序从数据库中取作业执行AnyEvent::HTTP::http_request,等待所有请求完成后如何正确退出程序?
#!/usr/bin/perl
use strict;
use AnyEvent;
use AnyEvent::DBI;
use AnyEvent::HTTP;
my @queue;
my $added = 0;
my $finished = 0;
my $uid = int(rand(900000)) + 100000;
my ($run, $jobs, $log, $quit);
my $db = AnyEvent::DBI->new('DBI:mysql:dbname=;host=');
my $cv = AnyEvent->condvar;
$run = AnyEvent->timer(
after => 5,
interval => 0.1,
cb => sub {
if ($#queue != -1 && $added - $finished < 300) {
my $job = shift @queue;
my $r; $r = http_request(
GET => $job->{url},
sub {
undef $r;
my ($body, $header) = @_;
...
$finished++;
}
);
$added++;
}
}
);
$jobs = AnyEvent->timer(
after => 0.1,
interval => 5,
cb => sub {
if ($#queue < 1000) {
$db->exec(q{UPDATE `jobs` SET `lock` = ? WHERE `lock` = 0 ORDER BY `time` ASC LIMIT 1000}, $uid, sub {
$db->exec(q{SELECT * FROM `jobs` WHERE `lock` = ?}, $uid, sub {
my ($db, $rows, $rv) = @_;
push @queue, @$rows;
$db->exec(q{UPDATE `jobs` SET `lock` = 1 WHERE `lock` = ?}, $uid, sub { });
});
});
}
}
);
$log = AnyEvent->timer(
after => 5,
interval => 3,
cb => sub {
printf "Queue: %-6d Added: %-6d During: %-6d Total: %-6d\n", $#queue, $added, $added-$finished, $finished;
}
);
$quit = AnyEvent->timer(
after => 1,
interval => 10,
cb => sub {
if (-f 'stop') {
print "Exit\n";
$cv->send;
}
}
);
my $result = $cv->recv;
也许您知道排队作业并执行它们的最佳方式,然后在 AnyEvent 中显示您的模板 + AnyEvent::HTTP,我使用 AnyEvent 计时器,它更好更快?
新版本:
#!/usr/bin/perl
use strict;
use AnyEvent;
use AnyEvent::DBI;
use AnyEvent::HTTP;
my $pid = int(rand(900000)) + 100000;
my $run = 1;
my $capacity = 300;
my $added = 0;
my $finished = 0;
my $cv = AnyEvent->condvar;
my $dbh = AnyEvent::DBI->new('DBI:mysql:dbname=;host=');
my $log = AnyEvent->timer(
after => 1,
interval => 3,
cb => sub {
printf "Added: %-6d Finished: %-6d Active: %-6d\n", $added, $finished, $AnyEvent::HTTP::ACTIVE if $finished;
if ($run == 0 && $AnyEvent::HTTP::ACTIVE == 0) {
$cv->send;
}
}
);
while (! -f 'stop') {
my $done = AnyEvent->condvar;
$dbh->exec(q{UPDATE `jobs` SET `lock` = ? WHERE `lock` = 0 ORDER BY `time` ASC LIMIT ?}, $pid, $capacity, sub {
$dbh->exec(q{SELECT * FROM `jobs` WHERE `lock` = ?}, $pid, sub {
my ($dbh, $rows, $rv) = @_;
$dbh->exec(q{UPDATE `jobs` SET `lock` = 1 WHERE `lock` = ?}, $pid, sub {});
$done->send($rows);
});
});
my $jobs = $done->recv;
my $done = AnyEvent->condvar;
foreach my $job (@$jobs) {
my $content;
my $r; $r = http_request(
GET => $job->{url},
sub {
undef $r;
my ($body, $header) = @_;
...
$dbh->exec(q{UPDATE `jobs` SET `lock` = 0, `time` = CURRENT_TIMESTAMP WHERE `id` = ?}, $job->{id}, sub {});
$done->send if $AnyEvent::HTTP::ACTIVE < $capacity;
$finished++;
}
);
$added++;
}
$done->recv;
}
$run = 0;
$cv->recv;
你基本上需要一个三态系统:
- 运行(已接受新工作)。
- 正在退出(未接受新作业;正在等待现有作业完成)。
- 退出(没有工作 运行)。
你的两个版本都失败了,因为它们只有两个状态:
- 运行(接受新工作)
- 退出(作业仍可运行!!!)
这是一个解决方案:
#!/usr/bin/perl
use strict;
use warnings;
use AE qw( );
use AnyEvent qw( );
use AnyEvent::DBI qw( );
use AnyEvent::HTTP qw( );
use Scalar::Util qw( weaken );
use Sys::Hostname qw( hostname );
my $capacity = 300;
my $exit_request = AE::cv();
my $done = AE::cv();
my $exiting = 0;
my $grabbing = 0;
my $added = 0;
my $finished = 0;
my $uid = join('.', hostname(), $$, int(rand(65536)));
my $db = AnyEvent::DBI->new('DBI:mysql:dbname=;host=');
sub worker {
my ($job, $cb) = @_;
http_request(
GET => $job->{url},
sub {
my ($body, $header) = @_;
...
$cb->();
},
);
}
sub manager {
my $active = $added - $finshed;
if ($exiting) {
$done->send() if !$active;
return;
}
my $avail_slots = $capacity - $active;
return if !$avail_slots;
return if $grabbing;
$grabbing = 1;
$db->exec(qq{UPDATE `jobs` SET `lock` = ? WHERE `status` = 'queued' AND `lock` = 0 ORDER BY `time` ASC LIMIT $avail_slots}, $uid, sub {
$db->exec(q{SELECT * FROM `jobs` WHERE `status` = 'queued' AND `lock` = ?}, $uid, sub {
my (undef, $jobs, undef) = @_;
$db->exec(q{UPDATE `jobs` SET `status = 'wip' WHERE `lock` = ?}, $uid, sub {
$grabbing = 0;
for my $job (@$jobs) {
++$added;
worker($job, sub {
++$finished;
$db->exec(q{UPDATE `jobs` SET `status = 'done', `lock` = 0, `time` = CURRENT_TIMESTAMP WHERE `id` = ?}, $job->{id}, sub { });
manager();
});
});
});
});
});
};
my $db_poll_timer = AE::timer(5, 0.5, \&manager);
my $exit_check_timer = AE::timer(0, 2, sub {
$exit_request->send() if -f 'stop';
});
my $log_timer = AE::timer(1, 3, sub {
printf("Added: %-6d Finished: %-6d Active: %-6d\n",
$added, $finished, $added-$finished);
});
$exit_request->recv();
print("Exiting...\n");
undef $exit_check_timer;
$exiting = 1;
$done->recv();
print("Finished.\n");
undef $db_poll_timer;
undef $log_timer;
特点:
- 删除了
@queue
以避免窃取其他任务的工作。 - 我把工人代码和工人管理代码分开了。
- 我删除了工人作为
http_request
的所有依赖项。 - 通过基于计时器的轮询支持长期 运行 工作人员(与您的第一个版本一样,但不同于您的第二个版本)。
- 使用
$grabbing
. 修复了求职代码中的竞争条件(存在于您的第一个版本中,但不存在于您的第二个版本中)
我超出了范围:
- 使用了 AE 中的快捷方式。
- 我使用了更可靠的锁值。它现在是一个字符串,甚至可以识别持有锁的机器和进程。
- 我向作业 table (
status
) 添加了一个字段来指示作业的状态,而不是重复使用lock
字段。
未来的工作:
- 使用 AnyEvent::Filesys::Notify 而不是计时器进行文件检查?