Laravel PHP 重新启动中断的作业
Laravel PHP restart interrupted job where left off
我有问题。我需要编写一个 Laravel 应用程序来读取 json 文件并将内容写入数据库。重要的是应用程序可以随时关闭(中断)并且应用程序需要在中断的地方继续。现在主要的写作部分是处理后台作业和批处理,如下所示:
控制器:
public function fileUploadPost(Request $request)
{
$data = json_decode(file_get_contents($request->file), true);
// Chunking file
$chunks = array_chunk($data, 1000);
// Create a background job foreach chunk
$path = storage_path('app/public/uploads');
$batch = Bus::batch([])->dispatch();
foreach ($chunks as $key => $chunk) {
$batch->add(new ProcessFile($chunk));
}
// Return success status
return back()->with('success','You have successfully uploaded the file.');
}
工作文件:
class ProcessFile implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
private $data = null;
/**
* ProcessFile constructor.
* @param array $data
*/
public function __construct(array $data)
{
$this->data = $data;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
foreach ($this->data as $value) {
$this->saveAccount($value);
}
}
public function failed(Throwable $exception) {
}
/**
* Store the given account data in the database using a transaction
*/
private function saveAccount(array $value) {
DB::transaction(function() use ($value) {
// Create Account object
$account = new Account;
$account->name=$value['name'];
$account->address=$value['address'];
$account->checked=$value['checked'];
$account->description=$value['description'];
$account->interest=$value['interest'];
$account->email=$value['email'];
$account->account=$value['account'];
$account->save();
});
}
}
现在,当我用这个应用程序杀死 docker 容器时,应用程序正在将数据写入数据库,然后再次重新启动 docker。我可以看到被中断的工作立即失败,但例外情况是:
Illuminate\Queue\MaxAttemptsExceededException: App\Jobs\ProcessFile has been attempted too many times or run too long. The job may have previously timed out. in /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php:750
Stack trace:
#0 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(504): Illuminate\Queue\Worker->maxAttemptsExceededException(Object(Illuminate\Queue\Jobs\DatabaseJob))
#1 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(418): Illuminate\Queue\Worker->markJobAsFailedIfAlreadyExceedsMaxAttempts('database', Object(Illuminate\Queue\Jobs\DatabaseJob), 1)
#2 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(378): Illuminate\Queue\Worker->process('database', Object(Illuminate\Queue\Jobs\DatabaseJob), Object(Illuminate\Queue\WorkerOptions))
#3 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(172): Illuminate\Queue\Worker->runJob(Object(Illuminate\Queue\Jobs\DatabaseJob), 'database', Object(Illuminate\Queue\WorkerOptions))
#4 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php(130): Illuminate\Queue\Worker->daemon('database', 'default', Object(Illuminate\Queue\WorkerOptions))
#5 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php(114): Illuminate\Queue\Console\WorkCommand->runWorker('database', 'default')
#6 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(36): Illuminate\Queue\Console\WorkCommand->handle()
#7 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/Util.php(41): Illuminate\Container\BoundMethod::Illuminate\Container\{closure}()
#8 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(93): Illuminate\Container\Util::unwrapIfClosure(Object(Closure))
#9 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(37): Illuminate\Container\BoundMethod::callBoundMethod(Object(Illuminate\Foundation\Application), Array, Object(Closure))
#10 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/Container.php(651): Illuminate\Container\BoundMethod::call(Object(Illuminate\Foundation\Application), Array, Array, NULL)
#11 /var/www/html/vendor/laravel/framework/src/Illuminate/Console/Command.php(136): Illuminate\Container\Container->call(Array)
#12 /var/www/html/vendor/symfony/console/Command/Command.php(291): Illuminate\Console\Command->execute(Object(Symfony\Component\Console\Input\ArgvInput), Object(Illuminate\Console\OutputStyle))
#13 /var/www/html/vendor/laravel/framework/src/Illuminate/Console/Command.php(121): Symfony\Component\Console\Command\Command->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Illuminate\Console\OutputStyle))
#14 /var/www/html/vendor/symfony/console/Application.php(989): Illuminate\Console\Command->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#15 /var/www/html/vendor/symfony/console/Application.php(299): Symfony\Component\Console\Application->doRunCommand(Object(Illuminate\Queue\Console\WorkCommand), Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#16 /var/www/html/vendor/symfony/console/Application.php(171): Symfony\Component\Console\Application->doRun(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#17 /var/www/html/vendor/laravel/framework/src/Illuminate/Console/Application.php(102): Symfony\Component\Console\Application->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#18 /var/www/html/vendor/laravel/framework/src/Illuminate/Foundation/Console/Kernel.php(129): Illuminate\Console\Application->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#19 /var/www/html/artisan(37): Illuminate\Foundation\Console\Kernel->handle(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#20 {main}
当我重新启动失败的作业时,它会完全重新开始写入,所以我在数据库中得到了重复的条目。允许重复条目,因此忽略重复项的数据库解决方案是不可接受的。
为什么这项工作失败了,我该如何解决?
根据我对这类东西的经验,生成一个 uuid 并将其分配给放入队列中的作业。插入数据库时,包括请求 ID。通过这种方式,您可以避免将相同的请求记录到数据库两次,因为您会检查是否存在与请求 ID 关联的任何记录。
由于每个作业都会插入多条记录,你可以统计数据库中的记录数和作业中的记录数。如果匹配,则工作完成。如果没有匹配项,则还有更多工作要做。
请记住,在某些情况下,两个工人可能从事同一份工作,而您最终会重复工作。
因此,您可能对使用复合主键感兴趣,它将请求 ID 和请求中的项目编号串联起来。然后,即使工作在一个工作人员中“卡住”,而您有另一个工作人员试图填补空缺,因为他们会处理相同的请求 ID,他们不能同时写入同一个数据库相同请求 ID 和项目编号的相同记录。
记录有关请求状态的信息也是一个好主意,这样您就可以了解何时写入数据、剩余多少、百分比等以及何时完成。
这会很快变得复杂起来。根据情况,您可能已经有了额外的可用信息,例如每条记录的主键,您可以在插入数据库时利用这些信息。
另请记住,作业可能会失败。您可能会得到一个错误的工作,在它可能连续失败 3 次后您想要拒绝它,因此它不会阻止队列的其余部分,也不会移到队列的后面,并在几次后中止尝试。对这些进行适当的记录很重要,这样您就可以弄清楚发生了什么以及是否要对它们采取某种措施。这方面的一个例子是,如果数据库进入某种维护状态,运行 在它工作时连接中断,或者如果用户 ID 有问题等。
我有问题。我需要编写一个 Laravel 应用程序来读取 json 文件并将内容写入数据库。重要的是应用程序可以随时关闭(中断)并且应用程序需要在中断的地方继续。现在主要的写作部分是处理后台作业和批处理,如下所示:
控制器:
public function fileUploadPost(Request $request)
{
$data = json_decode(file_get_contents($request->file), true);
// Chunking file
$chunks = array_chunk($data, 1000);
// Create a background job foreach chunk
$path = storage_path('app/public/uploads');
$batch = Bus::batch([])->dispatch();
foreach ($chunks as $key => $chunk) {
$batch->add(new ProcessFile($chunk));
}
// Return success status
return back()->with('success','You have successfully uploaded the file.');
}
工作文件:
class ProcessFile implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
private $data = null;
/**
* ProcessFile constructor.
* @param array $data
*/
public function __construct(array $data)
{
$this->data = $data;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
foreach ($this->data as $value) {
$this->saveAccount($value);
}
}
public function failed(Throwable $exception) {
}
/**
* Store the given account data in the database using a transaction
*/
private function saveAccount(array $value) {
DB::transaction(function() use ($value) {
// Create Account object
$account = new Account;
$account->name=$value['name'];
$account->address=$value['address'];
$account->checked=$value['checked'];
$account->description=$value['description'];
$account->interest=$value['interest'];
$account->email=$value['email'];
$account->account=$value['account'];
$account->save();
});
}
}
现在,当我用这个应用程序杀死 docker 容器时,应用程序正在将数据写入数据库,然后再次重新启动 docker。我可以看到被中断的工作立即失败,但例外情况是:
Illuminate\Queue\MaxAttemptsExceededException: App\Jobs\ProcessFile has been attempted too many times or run too long. The job may have previously timed out. in /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php:750
Stack trace:
#0 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(504): Illuminate\Queue\Worker->maxAttemptsExceededException(Object(Illuminate\Queue\Jobs\DatabaseJob))
#1 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(418): Illuminate\Queue\Worker->markJobAsFailedIfAlreadyExceedsMaxAttempts('database', Object(Illuminate\Queue\Jobs\DatabaseJob), 1)
#2 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(378): Illuminate\Queue\Worker->process('database', Object(Illuminate\Queue\Jobs\DatabaseJob), Object(Illuminate\Queue\WorkerOptions))
#3 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(172): Illuminate\Queue\Worker->runJob(Object(Illuminate\Queue\Jobs\DatabaseJob), 'database', Object(Illuminate\Queue\WorkerOptions))
#4 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php(130): Illuminate\Queue\Worker->daemon('database', 'default', Object(Illuminate\Queue\WorkerOptions))
#5 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php(114): Illuminate\Queue\Console\WorkCommand->runWorker('database', 'default')
#6 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(36): Illuminate\Queue\Console\WorkCommand->handle()
#7 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/Util.php(41): Illuminate\Container\BoundMethod::Illuminate\Container\{closure}()
#8 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(93): Illuminate\Container\Util::unwrapIfClosure(Object(Closure))
#9 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(37): Illuminate\Container\BoundMethod::callBoundMethod(Object(Illuminate\Foundation\Application), Array, Object(Closure))
#10 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/Container.php(651): Illuminate\Container\BoundMethod::call(Object(Illuminate\Foundation\Application), Array, Array, NULL)
#11 /var/www/html/vendor/laravel/framework/src/Illuminate/Console/Command.php(136): Illuminate\Container\Container->call(Array)
#12 /var/www/html/vendor/symfony/console/Command/Command.php(291): Illuminate\Console\Command->execute(Object(Symfony\Component\Console\Input\ArgvInput), Object(Illuminate\Console\OutputStyle))
#13 /var/www/html/vendor/laravel/framework/src/Illuminate/Console/Command.php(121): Symfony\Component\Console\Command\Command->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Illuminate\Console\OutputStyle))
#14 /var/www/html/vendor/symfony/console/Application.php(989): Illuminate\Console\Command->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#15 /var/www/html/vendor/symfony/console/Application.php(299): Symfony\Component\Console\Application->doRunCommand(Object(Illuminate\Queue\Console\WorkCommand), Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#16 /var/www/html/vendor/symfony/console/Application.php(171): Symfony\Component\Console\Application->doRun(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#17 /var/www/html/vendor/laravel/framework/src/Illuminate/Console/Application.php(102): Symfony\Component\Console\Application->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#18 /var/www/html/vendor/laravel/framework/src/Illuminate/Foundation/Console/Kernel.php(129): Illuminate\Console\Application->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#19 /var/www/html/artisan(37): Illuminate\Foundation\Console\Kernel->handle(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))
#20 {main}
当我重新启动失败的作业时,它会完全重新开始写入,所以我在数据库中得到了重复的条目。允许重复条目,因此忽略重复项的数据库解决方案是不可接受的。
为什么这项工作失败了,我该如何解决?
根据我对这类东西的经验,生成一个 uuid 并将其分配给放入队列中的作业。插入数据库时,包括请求 ID。通过这种方式,您可以避免将相同的请求记录到数据库两次,因为您会检查是否存在与请求 ID 关联的任何记录。
由于每个作业都会插入多条记录,你可以统计数据库中的记录数和作业中的记录数。如果匹配,则工作完成。如果没有匹配项,则还有更多工作要做。
请记住,在某些情况下,两个工人可能从事同一份工作,而您最终会重复工作。
因此,您可能对使用复合主键感兴趣,它将请求 ID 和请求中的项目编号串联起来。然后,即使工作在一个工作人员中“卡住”,而您有另一个工作人员试图填补空缺,因为他们会处理相同的请求 ID,他们不能同时写入同一个数据库相同请求 ID 和项目编号的相同记录。
记录有关请求状态的信息也是一个好主意,这样您就可以了解何时写入数据、剩余多少、百分比等以及何时完成。
这会很快变得复杂起来。根据情况,您可能已经有了额外的可用信息,例如每条记录的主键,您可以在插入数据库时利用这些信息。
另请记住,作业可能会失败。您可能会得到一个错误的工作,在它可能连续失败 3 次后您想要拒绝它,因此它不会阻止队列的其余部分,也不会移到队列的后面,并在几次后中止尝试。对这些进行适当的记录很重要,这样您就可以弄清楚发生了什么以及是否要对它们采取某种措施。这方面的一个例子是,如果数据库进入某种维护状态,运行 在它工作时连接中断,或者如果用户 ID 有问题等。