如何处理队列中的所有项目并退出
How to process all items in queue and exit
我在 LAMP 堆栈服务器 运行ning Ubuntu 上使用 Laravel 5。我正在使用队列系统:
http://laravel.com/docs/master/queues
我可以从文档中看到 运行 queue:listen 的能力,它将永远监听并处理队列中过去和未来的项目。
还有 queue:work 只处理队列中的第一项。
有没有办法只处理队列中的每个项目然后停止监听?
我只想定期处理队列,那么如何设置一个 cron 作业来处理队列,然后在队列中的所有内容完成后立即退出?
我在命令文件中使用它:
$queue = Queue::connection('yourqueueconnection');
while ($entry = $queue->pop()) {
// your task
}
我也在看这个。我修改了 built-in queue:work
命令来处理整个队列并退出。
php artisan make:console ProcessQueueAndExit
您可以在 https://gist.github.com/jdforsythe/b8c9bd46250ee23daa9de15d19495f07
获取代码
或者在这里,为了永久:
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Carbon\Carbon;
use Illuminate\Queue\Worker;
use Illuminate\Contracts\Queue\Job;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
class ProcessQueueAndExit extends Command {
protected $signature = 'queue:workall {connection?} {--queue=} {--daemon} {--delay=} {--force} {--memory=} {--sleep=} {--tries=}';
protected $description = 'Process all jobs on a queue and exit';
protected $worker;
public function __construct(Worker $worker) {
parent::__construct();
$this->worker = $worker;
}
public function handle() {
if ($this->downForMaintenance() && ! $this->option('daemon')) {
return $this->worker->sleep($this->option('sleep'));
}
$queue = $this->option('queue');
$delay = $this->option('delay');
$memory = $this->option('memory');
$connection = $this->argument('connection');
// keep processing until there are no more jobs returned
do {
$response = $this->runWorker(
$connection, $queue, $delay, $memory, $this->option('daemon')
);
if (! is_null($response['job'])) {
$this->writeOutput($response['job'], $response['failed']);
}
} while (! is_null($response['job']));
}
protected function runWorker($connection, $queue, $delay, $memory, $daemon = false) {
if ($daemon) {
$this->worker->setCache($this->laravel['cache']->driver());
$this->worker->setDaemonExceptionHandler(
$this->laravel['Illuminate\Contracts\Debug\ExceptionHandler']
);
return $this->worker->daemon(
$connection, $queue, $delay, $memory,
$this->option('sleep'), $this->option('tries')
);
}
return $this->worker->pop(
$connection, $queue, $delay,
$this->option('sleep'), $this->option('tries')
);
}
protected function writeOutput(Job $job, $failed) {
if ($failed) {
$this->output->writeln('<error>['.Carbon::now()->format('Y-m-d H:i:s').'] Failed:</error> '.$job->getName());
}
else {
$this->output->writeln('<info>['.Carbon::now()->format('Y-m-d H:i:s').'] Processed:</info> '.$job->getName());
}
}
protected function downForMaintenance() {
if ($this->option('force')) {
return false;
}
return $this->laravel->isDownForMaintenance();
}
}
我在 LAMP 堆栈服务器 运行ning Ubuntu 上使用 Laravel 5。我正在使用队列系统:
http://laravel.com/docs/master/queues
我可以从文档中看到 运行 queue:listen 的能力,它将永远监听并处理队列中过去和未来的项目。
还有 queue:work 只处理队列中的第一项。
有没有办法只处理队列中的每个项目然后停止监听?
我只想定期处理队列,那么如何设置一个 cron 作业来处理队列,然后在队列中的所有内容完成后立即退出?
我在命令文件中使用它:
$queue = Queue::connection('yourqueueconnection');
while ($entry = $queue->pop()) {
// your task
}
我也在看这个。我修改了 built-in queue:work
命令来处理整个队列并退出。
php artisan make:console ProcessQueueAndExit
您可以在 https://gist.github.com/jdforsythe/b8c9bd46250ee23daa9de15d19495f07
获取代码或者在这里,为了永久:
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Carbon\Carbon;
use Illuminate\Queue\Worker;
use Illuminate\Contracts\Queue\Job;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
class ProcessQueueAndExit extends Command {
protected $signature = 'queue:workall {connection?} {--queue=} {--daemon} {--delay=} {--force} {--memory=} {--sleep=} {--tries=}';
protected $description = 'Process all jobs on a queue and exit';
protected $worker;
public function __construct(Worker $worker) {
parent::__construct();
$this->worker = $worker;
}
public function handle() {
if ($this->downForMaintenance() && ! $this->option('daemon')) {
return $this->worker->sleep($this->option('sleep'));
}
$queue = $this->option('queue');
$delay = $this->option('delay');
$memory = $this->option('memory');
$connection = $this->argument('connection');
// keep processing until there are no more jobs returned
do {
$response = $this->runWorker(
$connection, $queue, $delay, $memory, $this->option('daemon')
);
if (! is_null($response['job'])) {
$this->writeOutput($response['job'], $response['failed']);
}
} while (! is_null($response['job']));
}
protected function runWorker($connection, $queue, $delay, $memory, $daemon = false) {
if ($daemon) {
$this->worker->setCache($this->laravel['cache']->driver());
$this->worker->setDaemonExceptionHandler(
$this->laravel['Illuminate\Contracts\Debug\ExceptionHandler']
);
return $this->worker->daemon(
$connection, $queue, $delay, $memory,
$this->option('sleep'), $this->option('tries')
);
}
return $this->worker->pop(
$connection, $queue, $delay,
$this->option('sleep'), $this->option('tries')
);
}
protected function writeOutput(Job $job, $failed) {
if ($failed) {
$this->output->writeln('<error>['.Carbon::now()->format('Y-m-d H:i:s').'] Failed:</error> '.$job->getName());
}
else {
$this->output->writeln('<info>['.Carbon::now()->format('Y-m-d H:i:s').'] Processed:</info> '.$job->getName());
}
}
protected function downForMaintenance() {
if ($this->option('force')) {
return false;
}
return $this->laravel->isDownForMaintenance();
}
}