PHP 7 (Symfony4) 中的后端多线程
backend multi-threading in PHP 7 (Symfony4)
(我读了其他问题,但它们指的是旧版本的 PHP 或前端多线程)
我有一个 PHP/PostgreSQL 应用程序,它有一个复杂的后端处理部分。本质上,有一个非常大的循环(数千次迭代)一遍又一遍地遍历相同的数据(使用排列)。在每个循环中,读取相同的数据,应用操作,将结果写回数据库。循环彼此完全独立,循环之间不保留任何结果。事实上,为了清除对象缓存内存(使用 Doctrine),我每 100 次左右循环清除一次缓存。
所以我基本上有:
for ($i=0; $i<5000; $i++) {
// fetch data
// manipulate data
// write results to a different table
}
在这些循环中从未触及原始数据,只填充了几个结果表。
目前这需要几分钟时间。在我看来,我像是并行处理的教科书示例。
将其放入多个威胁中的最佳方法是什么?我不太关心执行顺序,或者即使工作负载均匀分布(根据数据操作的性质,如果所有线程 运行 相同数量的循环,它们最终应该或多或少地具有相同的工作负载).我只想使用更多 CPU 个内核。
我在 PHP 5 中完成了多线程处理,但它……好吧……并不完美。可行,但困难。这在 PHP 7 中有改善吗?有没有一种相对简单的方法来基本上说 "for (...) and run it in n threads" ?
以防万一,应用程序是用 Symfony4 编写的,这个后端进程是通过控制台命令调用的。
有 pthreads 扩展被重写为在 v3 中使用起来更简单。它在 PHP 7.2+ 上受支持,并提供了一种在 PHP.
中创建多线程应用程序的方法
或者,因为您使用的是 Symfony - 您可以编写简单的控制台命令,该命令可以将 Process
组件用于 运行 子进程作为单独的 OS 进程。这是来自实际项目的 运行ner 示例:
<?php
namespace App\Command;
use App\Command\Exception\StopCommandException;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Exception\InvalidArgumentException;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\HttpKernel\KernelInterface;
use Symfony\Component\Process\Exception\RuntimeException;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Process;
use Webmozart\PathUtil\Path;
class ProcessingRunner extends AbstractCommand
{
use LockableTrait;
/**
* @var Process[]
*/
private $processes = [];
/**
* @var string[]
*/
private $cmd;
/**
* @var KernelInterface
*/
private $kernel;
/**
* @param KernelInterface $kernel
*/
public function __construct(KernelInterface $kernel)
{
parent::__construct();
$this->kernel = $kernel;
}
/**
* {@inheritdoc}
* @throws InvalidArgumentException
*/
protected function configure(): void
{
$this
->setName('app:processing:runner')
->setDescription('Run processing into multiple threads')
->addOption('threads', 't', InputOption::VALUE_REQUIRED, 'Number of threads to run at once', 1)
->addOption('at-once', 'm', InputOption::VALUE_REQUIRED, 'Amount of items to process at once', 10);
}
/**
* {@inheritdoc}
* @throws \Symfony\Component\Process\Exception\LogicException
* @throws InvalidArgumentException
* @throws RuntimeException
* @throws \Symfony\Component\DependencyInjection\Exception\InvalidArgumentException
* @throws \InvalidArgumentException
* @throws \LogicException
*/
protected function execute(InputInterface $input, OutputInterface $output): ?int
{
if (!$this->lock()) {
$output->writeln('The command is already running in another process.');
return 0;
}
if (extension_loaded('pcntl')) {
$stop = function () {
StopCommandException::throw();
};
pcntl_signal(SIGTERM, $stop);
pcntl_signal(SIGINT, $stop);
pcntl_async_signals(true);
}
do {
try {
while (\count($this->processes) < $this->getInput()->getOption('threads')) {
$process = $this->createProcess();
$process->start();
$this->processes[] = $process;
}
$this->processes = array_filter($this->processes, function (Process $p) {
return $p->isRunning();
});
usleep(1000);
} catch (StopCommandException $e) {
try {
defined('SIGKILL') || define('SIGKILL', 9);
array_map(function (Process $p) {
$p->signal(SIGKILL);
}, $this->processes);
} catch (\Throwable $e) {
}
break;
}
} while (true);
$this->release();
return 0;
}
/**
* @return Process
* @throws RuntimeException
* @throws \Symfony\Component\DependencyInjection\Exception\InvalidArgumentException
* @throws \InvalidArgumentException
* @throws \LogicException
* @throws InvalidArgumentException
*/
private function createProcess(): Process
{
if (!$this->cmd) {
$phpBinaryPath = (new PhpExecutableFinder())->find();
$this->cmd = [
$phpBinaryPath,
'-f',
Path::makeAbsolute('bin/console', $this->kernel->getProjectDir()),
'--',
'app:processing:worker',
'-e',
$this->kernel->getEnvironment(),
'-m',
$this->getInput()->getOption('at-once'),
];
}
return new Process($this->cmd);
}
}
(我读了其他问题,但它们指的是旧版本的 PHP 或前端多线程)
我有一个 PHP/PostgreSQL 应用程序,它有一个复杂的后端处理部分。本质上,有一个非常大的循环(数千次迭代)一遍又一遍地遍历相同的数据(使用排列)。在每个循环中,读取相同的数据,应用操作,将结果写回数据库。循环彼此完全独立,循环之间不保留任何结果。事实上,为了清除对象缓存内存(使用 Doctrine),我每 100 次左右循环清除一次缓存。
所以我基本上有:
for ($i=0; $i<5000; $i++) {
// fetch data
// manipulate data
// write results to a different table
}
在这些循环中从未触及原始数据,只填充了几个结果表。
目前这需要几分钟时间。在我看来,我像是并行处理的教科书示例。
将其放入多个威胁中的最佳方法是什么?我不太关心执行顺序,或者即使工作负载均匀分布(根据数据操作的性质,如果所有线程 运行 相同数量的循环,它们最终应该或多或少地具有相同的工作负载).我只想使用更多 CPU 个内核。
我在 PHP 5 中完成了多线程处理,但它……好吧……并不完美。可行,但困难。这在 PHP 7 中有改善吗?有没有一种相对简单的方法来基本上说 "for (...) and run it in n threads" ?
以防万一,应用程序是用 Symfony4 编写的,这个后端进程是通过控制台命令调用的。
有 pthreads 扩展被重写为在 v3 中使用起来更简单。它在 PHP 7.2+ 上受支持,并提供了一种在 PHP.
中创建多线程应用程序的方法或者,因为您使用的是 Symfony - 您可以编写简单的控制台命令,该命令可以将 Process
组件用于 运行 子进程作为单独的 OS 进程。这是来自实际项目的 运行ner 示例:
<?php
namespace App\Command;
use App\Command\Exception\StopCommandException;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Exception\InvalidArgumentException;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\HttpKernel\KernelInterface;
use Symfony\Component\Process\Exception\RuntimeException;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Process;
use Webmozart\PathUtil\Path;
class ProcessingRunner extends AbstractCommand
{
use LockableTrait;
/**
* @var Process[]
*/
private $processes = [];
/**
* @var string[]
*/
private $cmd;
/**
* @var KernelInterface
*/
private $kernel;
/**
* @param KernelInterface $kernel
*/
public function __construct(KernelInterface $kernel)
{
parent::__construct();
$this->kernel = $kernel;
}
/**
* {@inheritdoc}
* @throws InvalidArgumentException
*/
protected function configure(): void
{
$this
->setName('app:processing:runner')
->setDescription('Run processing into multiple threads')
->addOption('threads', 't', InputOption::VALUE_REQUIRED, 'Number of threads to run at once', 1)
->addOption('at-once', 'm', InputOption::VALUE_REQUIRED, 'Amount of items to process at once', 10);
}
/**
* {@inheritdoc}
* @throws \Symfony\Component\Process\Exception\LogicException
* @throws InvalidArgumentException
* @throws RuntimeException
* @throws \Symfony\Component\DependencyInjection\Exception\InvalidArgumentException
* @throws \InvalidArgumentException
* @throws \LogicException
*/
protected function execute(InputInterface $input, OutputInterface $output): ?int
{
if (!$this->lock()) {
$output->writeln('The command is already running in another process.');
return 0;
}
if (extension_loaded('pcntl')) {
$stop = function () {
StopCommandException::throw();
};
pcntl_signal(SIGTERM, $stop);
pcntl_signal(SIGINT, $stop);
pcntl_async_signals(true);
}
do {
try {
while (\count($this->processes) < $this->getInput()->getOption('threads')) {
$process = $this->createProcess();
$process->start();
$this->processes[] = $process;
}
$this->processes = array_filter($this->processes, function (Process $p) {
return $p->isRunning();
});
usleep(1000);
} catch (StopCommandException $e) {
try {
defined('SIGKILL') || define('SIGKILL', 9);
array_map(function (Process $p) {
$p->signal(SIGKILL);
}, $this->processes);
} catch (\Throwable $e) {
}
break;
}
} while (true);
$this->release();
return 0;
}
/**
* @return Process
* @throws RuntimeException
* @throws \Symfony\Component\DependencyInjection\Exception\InvalidArgumentException
* @throws \InvalidArgumentException
* @throws \LogicException
* @throws InvalidArgumentException
*/
private function createProcess(): Process
{
if (!$this->cmd) {
$phpBinaryPath = (new PhpExecutableFinder())->find();
$this->cmd = [
$phpBinaryPath,
'-f',
Path::makeAbsolute('bin/console', $this->kernel->getProjectDir()),
'--',
'app:processing:worker',
'-e',
$this->kernel->getEnvironment(),
'-m',
$this->getInput()->getOption('at-once'),
];
}
return new Process($this->cmd);
}
}