AMPHP - 排队的任务多于池中的可用工作人员
AMPHP - Queueing more Tasks than available Workers in Pool
我有一个项目,我正在将大量 .tif 图像转换为 PDF 文档。文件数达到数百万。
为了加快我使用 Amphp 的过程。由于使用 Imagemagick 转换图像的过程会占用一些 cpu 功率,因此我想限制并行 运行 转换过程的最大数量。
我的第一种方法可行,但如果我对文件进行排队而不是为一定数量的工作人员提供 x 文件数组,则可以改进。
这是我当前的代码,我试图在其中复制 the example。
<?php
require dirname(__DIR__) . '/vendor/autoload.php';
$constants = get_defined_constants(true);
$constants = $constants['user'];
$maxFileCount = THREAD_CHUNKSIZE * THREAD_COUNT;
$i = 0;
$folder = opendir(LOOKUP_PATH);
$tasks = [];
while ($i < $maxFileCount && (false !== ($import_file = readdir($folder)))) {
$fileParts = explode('.', $import_file);
$ext = strtolower(end($fileParts));
if($ext === 'xml') {
$filePath = LOOKUP_PATH. 'xml'.DIRECTORY_SEPARATOR.$import_file;
$tasks[] = new ConvertPdfTask([$filePath], $constants);
}
$i++;
}
if(!empty($tasks)) {
Amp\Loop::run(function () use ($tasks) {
$coroutines = [];
$pool = new Amp\Parallel\Worker\DefaultPool(THREAD_COUNT);
foreach ($tasks as $index => $task) {
$coroutines[] = Amp\call(function() use ($pool, $task) {
return yield $pool->enqueue($task);
});
}
$results = yield Amp\Promise\all($coroutines);
return yield $pool->shutdown();
});
}
我的问题是,一旦我排队超过 THREAD_COUNT
数量的任务,我就会收到以下 PHP 警告:Warning: Worker in pool exited unexpectedly with code -1
并且没有创建任何 PDF。
只要我保持在最大池大小以下,一切都很好。
我在 Windows 10 和 amphp/parallel 1.4.0 上使用 PHP 7.4.9。
经过更多试验后,我找到了一个解决方案,似乎可行。
感觉有点“hacky”,所以如果有人有更好的想法,请分享。我以为池会自动建立一个队列,然后由最大数量的工作人员处理,但似乎并非如此。
我现在将从 Amp\call
获得的协程保存在两个单独的数组中。一个包含所有协程,一个包含当前循环的所有协程。
$coroutine = Amp\call(function () use ($pool, $task) {
return yield $pool->enqueue($task);
});
$loopRoutines[] = $coroutine;
$allCoroutines[] = $coroutine;
将项目入队后,我检查是否已达到配置线程的最大数量。如果池中有最大数量的工作人员并且没有空闲工作人员,我会在我的 current-loop 协同程序上调用 Amp\Promise\first
函数来等待新的空闲空闲工作人员。
因为下次我到达那里时该函数会立即 return(因为完成的协程仍然在我的 current-loop 数组中),所以我清除了数组。
if ($pool->getWorkerCount() >= (THREAD_COUNT) && $pool->getIdleWorkerCount() === 0) {
yield Amp\Promise\first($loopRoutines);
$loopRoutines = [];
}
foreach 之后,我在我的 all-coroutines 数组上调用 Amp\Promise\all
,因此脚本会一直等到所有 worker 都完成。
这是我修改后的代码:
<?php
require dirname(__DIR__) . '/vendor/autoload.php';
$constants = get_defined_constants(true);
$constants = $constants['user'];
$maxFileCount = THREAD_CHUNKSIZE * THREAD_COUNT;
$i = 0;
$folder = opendir(LOOKUP_PATH);
$tasks = [];
while ($i < $maxFileCount && (false !== ($import_file = readdir($folder)))) {
$fileParts = explode('.', $import_file);
$ext = strtolower(end($fileParts));
if($ext === 'xml') {
$filePath = LOOKUP_PATH. 'xml'.DIRECTORY_SEPARATOR.$import_file;
$tasks[] = new ConvertPdfTask([$filePath], $constants);
}
$i++;
}
if(!empty($tasks)) {
Amp\Loop::run(function () use ($tasks) {
$allCoroutines = [];
$loopRoutines = [];
$pool = new Amp\Parallel\Worker\DefaultPool(THREAD_COUNT);
foreach ($tasks as $index => $task) {
$coroutine = Amp\call(function () use ($pool, $task) {
return yield $pool->enqueue($task);
});
$loopRoutines[] = $coroutine;
$allCoroutines[] = $coroutine;
if ($pool->getWorkerCount() >= THREAD_COUNT && $pool->getIdleWorkerCount() === 0) {
yield Amp\Promise\first($loopRoutines);
$loopRoutines = [];
}
}
yield Amp\Promise\all($allCoroutines);
return yield $pool->shutdown();
});
}
我有一个项目,我正在将大量 .tif 图像转换为 PDF 文档。文件数达到数百万。
为了加快我使用 Amphp 的过程。由于使用 Imagemagick 转换图像的过程会占用一些 cpu 功率,因此我想限制并行 运行 转换过程的最大数量。
我的第一种方法可行,但如果我对文件进行排队而不是为一定数量的工作人员提供 x 文件数组,则可以改进。
这是我当前的代码,我试图在其中复制 the example。
<?php
require dirname(__DIR__) . '/vendor/autoload.php';
$constants = get_defined_constants(true);
$constants = $constants['user'];
$maxFileCount = THREAD_CHUNKSIZE * THREAD_COUNT;
$i = 0;
$folder = opendir(LOOKUP_PATH);
$tasks = [];
while ($i < $maxFileCount && (false !== ($import_file = readdir($folder)))) {
$fileParts = explode('.', $import_file);
$ext = strtolower(end($fileParts));
if($ext === 'xml') {
$filePath = LOOKUP_PATH. 'xml'.DIRECTORY_SEPARATOR.$import_file;
$tasks[] = new ConvertPdfTask([$filePath], $constants);
}
$i++;
}
if(!empty($tasks)) {
Amp\Loop::run(function () use ($tasks) {
$coroutines = [];
$pool = new Amp\Parallel\Worker\DefaultPool(THREAD_COUNT);
foreach ($tasks as $index => $task) {
$coroutines[] = Amp\call(function() use ($pool, $task) {
return yield $pool->enqueue($task);
});
}
$results = yield Amp\Promise\all($coroutines);
return yield $pool->shutdown();
});
}
我的问题是,一旦我排队超过 THREAD_COUNT
数量的任务,我就会收到以下 PHP 警告:Warning: Worker in pool exited unexpectedly with code -1
并且没有创建任何 PDF。
只要我保持在最大池大小以下,一切都很好。
我在 Windows 10 和 amphp/parallel 1.4.0 上使用 PHP 7.4.9。
经过更多试验后,我找到了一个解决方案,似乎可行。 感觉有点“hacky”,所以如果有人有更好的想法,请分享。我以为池会自动建立一个队列,然后由最大数量的工作人员处理,但似乎并非如此。
我现在将从 Amp\call
获得的协程保存在两个单独的数组中。一个包含所有协程,一个包含当前循环的所有协程。
$coroutine = Amp\call(function () use ($pool, $task) {
return yield $pool->enqueue($task);
});
$loopRoutines[] = $coroutine;
$allCoroutines[] = $coroutine;
将项目入队后,我检查是否已达到配置线程的最大数量。如果池中有最大数量的工作人员并且没有空闲工作人员,我会在我的 current-loop 协同程序上调用 Amp\Promise\first
函数来等待新的空闲空闲工作人员。
因为下次我到达那里时该函数会立即 return(因为完成的协程仍然在我的 current-loop 数组中),所以我清除了数组。
if ($pool->getWorkerCount() >= (THREAD_COUNT) && $pool->getIdleWorkerCount() === 0) {
yield Amp\Promise\first($loopRoutines);
$loopRoutines = [];
}
foreach 之后,我在我的 all-coroutines 数组上调用 Amp\Promise\all
,因此脚本会一直等到所有 worker 都完成。
这是我修改后的代码:
<?php
require dirname(__DIR__) . '/vendor/autoload.php';
$constants = get_defined_constants(true);
$constants = $constants['user'];
$maxFileCount = THREAD_CHUNKSIZE * THREAD_COUNT;
$i = 0;
$folder = opendir(LOOKUP_PATH);
$tasks = [];
while ($i < $maxFileCount && (false !== ($import_file = readdir($folder)))) {
$fileParts = explode('.', $import_file);
$ext = strtolower(end($fileParts));
if($ext === 'xml') {
$filePath = LOOKUP_PATH. 'xml'.DIRECTORY_SEPARATOR.$import_file;
$tasks[] = new ConvertPdfTask([$filePath], $constants);
}
$i++;
}
if(!empty($tasks)) {
Amp\Loop::run(function () use ($tasks) {
$allCoroutines = [];
$loopRoutines = [];
$pool = new Amp\Parallel\Worker\DefaultPool(THREAD_COUNT);
foreach ($tasks as $index => $task) {
$coroutine = Amp\call(function () use ($pool, $task) {
return yield $pool->enqueue($task);
});
$loopRoutines[] = $coroutine;
$allCoroutines[] = $coroutine;
if ($pool->getWorkerCount() >= THREAD_COUNT && $pool->getIdleWorkerCount() === 0) {
yield Amp\Promise\first($loopRoutines);
$loopRoutines = [];
}
}
yield Amp\Promise\all($allCoroutines);
return yield $pool->shutdown();
});
}