Laravel 中队列的 Beanstalkd 故障转移
Beanstalkd failover for queues in Laravel
有人知道 Laravel 队列的强大故障转移机制吗?
有一次我的 beanstalkd 服务器出现了某种错误(仍在弄清楚出了什么问题)在 Laravel (4) 队列驱动程序中触发了 Pheanstalk_Exception_ConnectionException
。因此,无法将新作业推送到队列。
我想要完成的是为 QueueInterface
提供某种故障转移驱动程序,它可以采用多个驱动程序实例,因此我可以定义例如 'sync' 或 'redis' 驱动程序作为故障转移队列。然后一旦 beanstalkd 失败,作业将由该驱动程序执行,并且不会丢失任何工作。
我只是想为你做一个非常小的尝试,希望它能给你一个想法……我认为每个人对队列都有不同的需求,所以这就是我所做的。
老实说,我只是剥离了一堆代码来尝试简化我所做的事情。我想试着从中弄明白。
在您的队列配置中:
'connections' => array(
'beanstalkd' => array(
'driver' => 'beanstalk_extended',
'host' => 'my.ip.address',
'queue' => 'default',
'ttr' => 60,
),
'my_fallback' => array(
'driver' => 'sync',
),
);
在 ServiceProvider@boot 中:
/**
* Boot the Beanstalkd queue.
*/
protected function bootQueue()
{
$this->app['queue']->extend('beanstalk_extended', function () {
return new BeanstalkConnector;
});
$this->app->bindShared('queue.failer', function($app) {
$config = $app['config']['queue.failed'];
return new DatabaseFailedJobProvider($app['db'], $config['database'], $config['table']);
});
$this->app->bindShared('queue.worker', function($app) {
return new Worker($app['queue'], $app['queue.failer'], $app['events']);
});
}
连接器:
<?php namespace App\Framework\Queue\Connectors;
use Illuminate\Queue\Connectors\ConnectorInterface;
use Pheanstalk_Pheanstalk as Pheanstalk;
use App\Framework\Queue\Beanstalk;
class BeanstalkConnector implements ConnectorInterface
{
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Queue\QueueInterface
*/
public function connect(array $config)
{
$pheanstalk = new Pheanstalk($config['host']);
$bean = new Beanstalk($pheanstalk, $config['queue'], array_get($config, 'ttr', Pheanstalk::DEFAULT_TTR));
return $bean;
}
}
然后在 Beanstalkd 扩展中:
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return int
*/
public function push($job, $data = '', $queue = null)
{
try {
$queue = $this->getQueue($queue);
$id = parent::push($job, $data, $queue);
return $id;
} catch (\Exception $e) {
return \Queue::connection('my_fallback')->push($job, $data, $queue);
}
}
有人知道 Laravel 队列的强大故障转移机制吗?
有一次我的 beanstalkd 服务器出现了某种错误(仍在弄清楚出了什么问题)在 Laravel (4) 队列驱动程序中触发了 Pheanstalk_Exception_ConnectionException
。因此,无法将新作业推送到队列。
我想要完成的是为 QueueInterface
提供某种故障转移驱动程序,它可以采用多个驱动程序实例,因此我可以定义例如 'sync' 或 'redis' 驱动程序作为故障转移队列。然后一旦 beanstalkd 失败,作业将由该驱动程序执行,并且不会丢失任何工作。
我只是想为你做一个非常小的尝试,希望它能给你一个想法……我认为每个人对队列都有不同的需求,所以这就是我所做的。
老实说,我只是剥离了一堆代码来尝试简化我所做的事情。我想试着从中弄明白。
在您的队列配置中:
'connections' => array(
'beanstalkd' => array(
'driver' => 'beanstalk_extended',
'host' => 'my.ip.address',
'queue' => 'default',
'ttr' => 60,
),
'my_fallback' => array(
'driver' => 'sync',
),
);
在 ServiceProvider@boot 中:
/**
* Boot the Beanstalkd queue.
*/
protected function bootQueue()
{
$this->app['queue']->extend('beanstalk_extended', function () {
return new BeanstalkConnector;
});
$this->app->bindShared('queue.failer', function($app) {
$config = $app['config']['queue.failed'];
return new DatabaseFailedJobProvider($app['db'], $config['database'], $config['table']);
});
$this->app->bindShared('queue.worker', function($app) {
return new Worker($app['queue'], $app['queue.failer'], $app['events']);
});
}
连接器:
<?php namespace App\Framework\Queue\Connectors;
use Illuminate\Queue\Connectors\ConnectorInterface;
use Pheanstalk_Pheanstalk as Pheanstalk;
use App\Framework\Queue\Beanstalk;
class BeanstalkConnector implements ConnectorInterface
{
/**
* Establish a queue connection.
*
* @param array $config
* @return \Illuminate\Queue\QueueInterface
*/
public function connect(array $config)
{
$pheanstalk = new Pheanstalk($config['host']);
$bean = new Beanstalk($pheanstalk, $config['queue'], array_get($config, 'ttr', Pheanstalk::DEFAULT_TTR));
return $bean;
}
}
然后在 Beanstalkd 扩展中:
/**
* Push a new job onto the queue.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return int
*/
public function push($job, $data = '', $queue = null)
{
try {
$queue = $this->getQueue($queue);
$id = parent::push($job, $data, $queue);
return $id;
} catch (\Exception $e) {
return \Queue::connection('my_fallback')->push($job, $data, $queue);
}
}