如何为 laravel 中正在处理的作业切换数据库连接

How to switch database connection for the job being processed in laravel

我有一个 Laravel 项目,它通过 API 与 React 项目交互。它使用 RDS MySQL 实例作为数据库。

我们通过 SQS 处理的作业对用户的社交提要进行一些预计算。当这些作业 运行 正在运行时,数据库实例的使用量会激增并最终影响连接到同一数据库实例的实时流量。

我创建了原始的只读副本实例,并希望我的作业连接到这个只读副本以进行所有计算,主要是 运行 应该连接到只读副本连接的所有查询。

我在作业中定义的查询使用 on 函数指定要使用的连接 Model::on('mysql_read_replica') 其中 mysql_read_replicaconfig/database.php 中定义。

但是,模型中的自定义属性、代码库中的辅助函数继续使用原始连接。这会继续使影响实时流量的原始实例过载。

我查看了 并提出了以下解决方案:我更新配置并将连接设置为只读副本,并在 handle 方法结束时将其改回

public function handle() {
    $mysql_rr = Config::get('database.connections.mysql_rr');
    $mysql = Config::get('database.connections.mysql');
    DB::purge('mysql');
    Config::set('database.connections.mysql', $mysql_rr);

    // job processing code

    DB::purge('mysql');
    Config::set('database.connections.mysql', $mysql);
}

我添加了一个事件监听回调,我能够验证所有查询都是 运行 针对只读副本实例使用:

\Event::listen('Illuminate\Database\Events\QueryExecuted', function ($query) {          
    $sql = $query->sql; 
    $time = $query->time;
    $connection = $query->connection->getName();
    $dbName = $query->connection->getDatabaseName();

    Log::info('job query : '.$sql);
    Log::info('job time '.$time);
    Log::info('job connection '.$connection);
    Log::info('job dbName '.$dbName);
});

因为这些作业对于每个用户来说都是独一无二的。它有一个 UniqueJobs 特征,我现在看到的问题是一旦作业完成执行,它就无法释放存储在缓存中的锁。 错误日志指出:

[2022-02-09 13:43:39] local.ERROR: Call to a member function prepare() on null {"exception":"[object] (Error(code: 0): Call to a member function prepare() on null at vendor\laravel\framework\src\Illuminate\Database\Connection.php:492)
[stacktrace]
#0 vendor\laravel\framework\src\Illuminate\Database\Connection.php(671): Illuminate\Database\Connection->Illuminate\Database\{closure}('delete from `ca...', Array)
#1 vendor\laravel\framework\src\Illuminate\Database\Connection.php(638): Illuminate\Database\Connection->runQueryCallback('delete from `ca...', Array, Object(Closure))
#2 vendor\laravel\framework\src\Illuminate\Database\Connection.php(503): Illuminate\Database\Connection->run('delete from `ca...', Array, Object(Closure))
#3 vendor\laravel\framework\src\Illuminate\Database\Connection.php(448): Illuminate\Database\Connection->affectingStatement('delete from `ca...', Array)
#4 vendor\laravel\framework\src\Illuminate\Database\Query\Builder.php(3043): Illuminate\Database\Connection->delete('delete from `ca...', Array)
#5 vendor\laravel\framework\src\Illuminate\Cache\DatabaseLock.php(127): Illuminate\Database\Query\Builder->delete()
#6 vendor\laravel\framework\src\Illuminate\Queue\CallQueuedHandler.php(211): Illuminate\Cache\DatabaseLock->forceRelease()
#7 vendor\laravel\framework\src\Illuminate\Queue\CallQueuedHandler.php(254): Illuminate\Queue\CallQueuedHandler->ensureUniqueJobLockIsReleased(Object(App\Jobs\GenerateDigest))
#8 vendor\laravel\framework\src\Illuminate\Queue\Jobs\Job.php(213): Illuminate\Queue\CallQueuedHandler->failed(Array, Object(Error), '634f580c-3a6f-4...')
#9 vendor\laravel\framework\src\Illuminate\Queue\Jobs\Job.php(192): Illuminate\Queue\Jobs\Job->failed(Object(Error))
#10 vendor\laravel\framework\src\Illuminate\Queue\Worker.php(548): Illuminate\Queue\Jobs\Job->fail(Object(Error))
#11 vendor\laravel\framework\src\Illuminate\Queue\Worker.php(509): Illuminate\Queue\Worker->failJob(Object(Illuminate\Queue\Jobs\SqsJob), Object(Error))
#12 vendor\laravel\framework\src\Illuminate\Queue\Worker.php(437): Illuminate\Queue\Worker->markJobAsFailedIfWillExceedMaxAttempts('sqs', Object(Illuminate\Queue\Jobs\SqsJob), 1, Object(Error))
#13 vendor\laravel\framework\src\Illuminate\Queue\Worker.php(414): Illuminate\Queue\Worker->handleJobException('sqs', Object(Illuminate\Queue\Jobs\SqsJob), Object(Illuminate\Queue\WorkerOptions), Object(Error))
#14 vendor\laravel\framework\src\Illuminate\Queue\Worker.php(360): Illuminate\Queue\Worker->process('sqs', Object(Illuminate\Queue\Jobs\SqsJob), Object(Illuminate\Queue\WorkerOptions))
#15 vendor\laravel\framework\src\Illuminate\Queue\Worker.php(311): Illuminate\Queue\Worker->runJob(Object(Illuminate\Queue\Jobs\SqsJob), 'sqs', Object(Illuminate\Queue\WorkerOptions))
#16 vendor\laravel\framework\src\Illuminate\Queue\Console\WorkCommand.php(117): Illuminate\Queue\Worker->runNextJob('sqs', 'sqs-generation-queue', Object(Illuminate\Queue\WorkerOptions))
#17 vendor\laravel\framework\src\Illuminate\Queue\Console\WorkCommand.php(101): Illuminate\Queue\Console\WorkCommand->runWorker('sqs', 'sqs-generation-queue')
#18 vendor\laravel\framework\src\Illuminate\Container\BoundMethod.php(36): Illuminate\Queue\Console\WorkCommand->handle()
#19 vendor\laravel\framework\src\Illuminate\Container\Util.php(40): Illuminate\Container\BoundMethod::Illuminate\Container\{closure}()
#20 vendor\laravel\framework\src\Illuminate\Container\BoundMethod.php(93): Illuminate\Container\Util::unwrapIfClosure(Object(Closure))
#21 vendor\laravel\framework\src\Illuminate\Container\BoundMethod.php(37): Illuminate\Container\BoundMethod::callBoundMethod(Object(Illuminate\Foundation\Application), Array, Object(Closure))
#22 vendor\laravel\framework\src\Illuminate\Container\Container.php(611): Illuminate\Container\BoundMethod::call(Object(Illuminate\Foundation\Application), Array, Array, NULL)
#23 vendor\laravel\framework\src\Illuminate\Console\Command.php(136): Illuminate\Container\Container->call(Array)
#24 vendor\symfony\console\Command\Command.php(255): Illuminate\Console\Command->execute(Object(Symfony\Component\Console\Input\ArgvInput), Object(Illuminate\Console\OutputStyle))
#25 vendor\laravel\framework\src\Illuminate\Console\Command.php(121): Symfony\Component\Console\Command\Command->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Illuminate\Console\OutputStyle))
#26 vendor\symfony\console\Application.php(971): Illuminate\Console\Command->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#27 vendor\symfony\console\Application.php(290): Symfony\Component\Console\Application->doRunCommand(Object(Illuminate\Queue\Console\WorkCommand), Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#28 vendor\symfony\console\Application.php(166): Symfony\Component\Console\Application->doRun(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#29 vendor\laravel\framework\src\Illuminate\Console\Application.php(92): Symfony\Component\Console\Application->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#30 vendor\laravel\framework\src\Illuminate\Foundation\Console\Kernel.php(129): Illuminate\Console\Application->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#31 artisan(37): Illuminate\Foundation\Console\Kernel->handle(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#32 {main}
"} 

添加一些日志记录后,我发现 vendor\laravel\framework\src\Illuminate\Database\Connection.php 中的 getPdo() 函数基本上是堆栈跟踪所说的 null 。 如果我删除切换连接的代码,作业处理正常并且能够释放锁。

因为我们使用 database 作为我们的缓存存储,所以我将 cacheconfig/database 中的连接设置为包含 read-write 实例连接的新连接.

        'database' => [
            'driver' => 'database',
            'table' => 'cache',
            // use the read write database connection for cache operations
            'connection' => 'mysql_rw',
        ],

这允许缓存显式连接到 read-write 实例以处理 cache_locks,同时允许作业根据需要切换连接。