PHP 7 (Symfony4) 中的后端多线程

backend multi-threading in PHP 7 (Symfony4)

(我读了其​​他问题,但它们指的是旧版本的 PHP 或前端多线程)

我有一个 PHP/PostgreSQL 应用程序,它有一个复杂的后端处理部分。本质上,有一个非常大的循环(数千次迭代)一遍又一遍地遍历相同的数据(使用排列)。在每个循环中,读取相同的数据,应用操作,将结果写回数据库。循环彼此完全独立,循环之间不保留任何结果。事实上,为了清除对象缓存内存(使用 Doctrine),我每 100 次左右循环清除一次缓存。

所以我基本上有:

for ($i=0; $i<5000; $i++) {
   // fetch data
   // manipulate data
   // write results to a different table
}

在这些循环中从未触及原始数据,只填充了几个结果表。

目前这需要几分钟时间。在我看来,我像是并行处理的教科书示例。

将其放入多个威胁中的最佳方法是什么?我不太关心执行顺序,或者即使工作负载均匀分布(根据数据操作的性质,如果所有线程 运行 相同数量的循环,它们最终应该或多或少地具有相同的工作负载).我只想使用更多 CPU 个内核。

我在 PHP 5 中完成了多线程处理,但它……好吧……并不完美。可行,但困难。这在 PHP 7 中有改善吗?有没有一种相对简单的方法来基本上说 "for (...) and run it in n threads" ?

以防万一,应用程序是用 Symfony4 编写的,这个后端进程是通过控制台命令调用的。

pthreads 扩展被重写为在 v3 中使用起来更简单。它在 PHP 7.2+ 上受支持,并提供了一种在 PHP.

中创建多线程应用程序的方法

或者,因为您使用的是 Symfony - 您可以编写简单的控制台命令,该命令可以将 Process 组件用于 运行 子进程作为单独的 OS 进程。这是来自实际项目的 运行ner 示例:

<?php

namespace App\Command;

use App\Command\Exception\StopCommandException;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Exception\InvalidArgumentException;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\HttpKernel\KernelInterface;
use Symfony\Component\Process\Exception\RuntimeException;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Process;
use Webmozart\PathUtil\Path;

class ProcessingRunner extends AbstractCommand
{
    use LockableTrait;
    /**
     * @var Process[]
     */
    private $processes = [];
    /**
     * @var string[]
     */
    private $cmd;
    /**
     * @var KernelInterface
     */
    private $kernel;

    /**
     * @param KernelInterface $kernel
     */
    public function __construct(KernelInterface $kernel)
    {
        parent::__construct();
        $this->kernel = $kernel;
    }

    /**
     * {@inheritdoc}
     * @throws InvalidArgumentException
     */
    protected function configure(): void
    {
        $this
            ->setName('app:processing:runner')
            ->setDescription('Run processing into multiple threads')
            ->addOption('threads', 't', InputOption::VALUE_REQUIRED, 'Number of threads to run at once', 1)
            ->addOption('at-once', 'm', InputOption::VALUE_REQUIRED, 'Amount of items to process at once', 10);
    }

    /**
     * {@inheritdoc}
     * @throws \Symfony\Component\Process\Exception\LogicException
     * @throws InvalidArgumentException
     * @throws RuntimeException
     * @throws \Symfony\Component\DependencyInjection\Exception\InvalidArgumentException
     * @throws \InvalidArgumentException
     * @throws \LogicException
     */
    protected function execute(InputInterface $input, OutputInterface $output): ?int
    {
        if (!$this->lock()) {
            $output->writeln('The command is already running in another process.');
            return 0;
        }
        if (extension_loaded('pcntl')) {
            $stop = function () {
                StopCommandException::throw();
            };
            pcntl_signal(SIGTERM, $stop);
            pcntl_signal(SIGINT, $stop);
            pcntl_async_signals(true);
        }
        do {
            try {
                while (\count($this->processes) < $this->getInput()->getOption('threads')) {
                    $process = $this->createProcess();
                    $process->start();
                    $this->processes[] = $process;
                }
                $this->processes = array_filter($this->processes, function (Process $p) {
                    return $p->isRunning();
                });
                usleep(1000);
            } catch (StopCommandException $e) {
                try {
                    defined('SIGKILL') || define('SIGKILL', 9);
                    array_map(function (Process $p) {
                        $p->signal(SIGKILL);
                    }, $this->processes);
                } catch (\Throwable $e) {

                }
                break;
            }
        } while (true);
        $this->release();
        return 0;
    }

    /**
     * @return Process
     * @throws RuntimeException
     * @throws \Symfony\Component\DependencyInjection\Exception\InvalidArgumentException
     * @throws \InvalidArgumentException
     * @throws \LogicException
     * @throws InvalidArgumentException
     */
    private function createProcess(): Process
    {
        if (!$this->cmd) {
            $phpBinaryPath = (new PhpExecutableFinder())->find();
            $this->cmd = [
                $phpBinaryPath,
                '-f',
                Path::makeAbsolute('bin/console', $this->kernel->getProjectDir()),
                '--',
                'app:processing:worker',
                '-e',
                $this->kernel->getEnvironment(),
                '-m',
                $this->getInput()->getOption('at-once'),
            ];
        }
        return new Process($this->cmd);
    }
}