在 PHP 中卸载一次性工作线程的最佳方法?线程? fcntl?
Best way to offload one-shot worker threads in PHP? pthreads? fcntl?
我应该如何对一些需要超时的 php-cli 代码进行多线程处理?
我在 Centos 6.6 上从命令行使用 PHP 5.6。
我对多线程术语或代码不是很熟悉。我将在这里简化代码,但它 100% 代表了我想做的事情。
非线程代码目前看起来像这样:
$datasets = MyLibrary::getAllRawDataFromDBasArrays();
foreach ($datasets as $dataset) {
MyLibrary::processRawDataAndStoreResultInDB($dataset);
}
exit; // just for clarity
我需要预取我所有的数据集,每个 processRawDataAndStoreResultInDB() 都无法获取它自己的数据集。有时 processRawDataAndStoreResultInDB() 处理数据集的时间太长,所以我想限制它处理数据集的时间。
所以你可以看到让它成为多线程会
- 通过允许多个 processRawDataAndStoreResultInDB() 同时执行来加快速度
- 使用 set_time_limit() 来限制每个人处理每个数据集的时间
请注意,我不需要返回到我的主程序。由于这是一种简化,您可以相信我不想收集所有处理过的数据集并在完成后将它们保存到数据库中。
我想做类似的事情:
class MyWorkerThread extends SomeThreadType {
public function __construct($timeout, $dataset) {
$this->timeout = $timeout;
$this->dataset = $dataset;
}
public function run() {
set_time_limit($this->timeout);
MyLibrary::processRawDataAndStoreResultInDB($this->dataset);
}
}
$numberOfThreads = 4;
$pool = somePoolClass($numberOfThreads);
$pool->start();
$datasets = MyLibrary::getAllRawDataFromDBasArrays();
$timeoutForEachThread = 5; // seconds
foreach ($datasets as $dataset) {
$thread = new MyWorkerThread($timeoutForEachThread, $dataset);
$thread->addCallbackOnTerminated(function() {
if ($this->isTimeout()) {
MyLibrary::saveBadDatasetToDb($dataset);
}
}
$pool->addToQueue($thread);
}
$pool->waitUntilAllWorkersAreFinished();
exit; // for clarity
根据我的在线研究,我发现了 PHP 扩展 pthreads,我可以将其与我的线程安全 php CLI 一起使用,或者我可以使用 PCNTL 扩展或围绕它的包装库(比如说,Arara/Process)
- https://github.com/krakjoe/pthreads(和示例目录)
- https://github.com/Arara/Process(pcntl 包装器)
当我查看它们和它们的示例时(尤其是 pthreads 池示例),我很快就被术语弄糊涂了,我应该使用哪些 classes 来实现我正在寻找的多线程.
我什至不介意自己创建池 class,如果我在线程上有 isRunning()、isTerminated()、getTerminationStatus() 和 execute() 函数 class,因为这将是一个简单的队列。
有更多经验的人可以告诉我应该使用哪个库、classes 和函数来映射到我上面的示例吗?我是否完全采用了错误的方法?
提前致谢。
下面是一个使用工作进程的例子。我正在使用 pcntl 扩展。
/**
* Spawns a worker process and returns it pid or -1
* if something goes wrong.
*
* @param callback function, closure or method to call
* @return integer
*/
function worker($callback) {
$pid = pcntl_fork();
if($pid === 0) {
// Child process
exit($callback());
} else {
// Main process or an error
return $pid;
}
}
$datasets = array(
array('test', '123'),
array('foo', 'bar')
);
$maxWorkers = 1;
$numWorkers = 0;
foreach($datasets as $dataset) {
$pid = worker(function () use ($dataset) {
// Do DB stuff here
var_dump($dataset);
return 0;
});
if($pid !== -1) {
$numWorkers++;
} else {
// Handle fork errors here
echo 'Failed to spawn worker';
}
// If $maxWorkers is reached we need to wait
// for at least one child to return
if($numWorkers === $maxWorkers) {
// $status is passed by reference
$pid = pcntl_wait($status);
echo "child process $pid returned $status\n";
$numWorkers--;
}
}
// (Non blocking) wait for the remaining childs
while(true) {
// $status is passed by reference
$pid = pcntl_wait($status, WNOHANG);
if(is_null($pid) || $pid === -1) {
break;
}
if($pid === 0) {
// Be patient ...
usleep(50000);
continue;
}
echo "child process $pid returned $status\n";
}
我应该如何对一些需要超时的 php-cli 代码进行多线程处理?
我在 Centos 6.6 上从命令行使用 PHP 5.6。
我对多线程术语或代码不是很熟悉。我将在这里简化代码,但它 100% 代表了我想做的事情。
非线程代码目前看起来像这样:
$datasets = MyLibrary::getAllRawDataFromDBasArrays();
foreach ($datasets as $dataset) {
MyLibrary::processRawDataAndStoreResultInDB($dataset);
}
exit; // just for clarity
我需要预取我所有的数据集,每个 processRawDataAndStoreResultInDB() 都无法获取它自己的数据集。有时 processRawDataAndStoreResultInDB() 处理数据集的时间太长,所以我想限制它处理数据集的时间。
所以你可以看到让它成为多线程会
- 通过允许多个 processRawDataAndStoreResultInDB() 同时执行来加快速度
- 使用 set_time_limit() 来限制每个人处理每个数据集的时间
请注意,我不需要返回到我的主程序。由于这是一种简化,您可以相信我不想收集所有处理过的数据集并在完成后将它们保存到数据库中。
我想做类似的事情:
class MyWorkerThread extends SomeThreadType {
public function __construct($timeout, $dataset) {
$this->timeout = $timeout;
$this->dataset = $dataset;
}
public function run() {
set_time_limit($this->timeout);
MyLibrary::processRawDataAndStoreResultInDB($this->dataset);
}
}
$numberOfThreads = 4;
$pool = somePoolClass($numberOfThreads);
$pool->start();
$datasets = MyLibrary::getAllRawDataFromDBasArrays();
$timeoutForEachThread = 5; // seconds
foreach ($datasets as $dataset) {
$thread = new MyWorkerThread($timeoutForEachThread, $dataset);
$thread->addCallbackOnTerminated(function() {
if ($this->isTimeout()) {
MyLibrary::saveBadDatasetToDb($dataset);
}
}
$pool->addToQueue($thread);
}
$pool->waitUntilAllWorkersAreFinished();
exit; // for clarity
根据我的在线研究,我发现了 PHP 扩展 pthreads,我可以将其与我的线程安全 php CLI 一起使用,或者我可以使用 PCNTL 扩展或围绕它的包装库(比如说,Arara/Process)
- https://github.com/krakjoe/pthreads(和示例目录)
- https://github.com/Arara/Process(pcntl 包装器)
当我查看它们和它们的示例时(尤其是 pthreads 池示例),我很快就被术语弄糊涂了,我应该使用哪些 classes 来实现我正在寻找的多线程.
我什至不介意自己创建池 class,如果我在线程上有 isRunning()、isTerminated()、getTerminationStatus() 和 execute() 函数 class,因为这将是一个简单的队列。
有更多经验的人可以告诉我应该使用哪个库、classes 和函数来映射到我上面的示例吗?我是否完全采用了错误的方法?
提前致谢。
下面是一个使用工作进程的例子。我正在使用 pcntl 扩展。
/**
* Spawns a worker process and returns it pid or -1
* if something goes wrong.
*
* @param callback function, closure or method to call
* @return integer
*/
function worker($callback) {
$pid = pcntl_fork();
if($pid === 0) {
// Child process
exit($callback());
} else {
// Main process or an error
return $pid;
}
}
$datasets = array(
array('test', '123'),
array('foo', 'bar')
);
$maxWorkers = 1;
$numWorkers = 0;
foreach($datasets as $dataset) {
$pid = worker(function () use ($dataset) {
// Do DB stuff here
var_dump($dataset);
return 0;
});
if($pid !== -1) {
$numWorkers++;
} else {
// Handle fork errors here
echo 'Failed to spawn worker';
}
// If $maxWorkers is reached we need to wait
// for at least one child to return
if($numWorkers === $maxWorkers) {
// $status is passed by reference
$pid = pcntl_wait($status);
echo "child process $pid returned $status\n";
$numWorkers--;
}
}
// (Non blocking) wait for the remaining childs
while(true) {
// $status is passed by reference
$pid = pcntl_wait($status, WNOHANG);
if(is_null($pid) || $pid === -1) {
break;
}
if($pid === 0) {
// Be patient ...
usleep(50000);
continue;
}
echo "child process $pid returned $status\n";
}