如何在 PHP 中对 Fibers 使用多个异步 fread?
How to use multiple async fread with Fibers in PHP?
我想使用 fread 和 Fibers 从列表中的每个 url 获取内容,其中每个流不需要等待 feof 到 运行 另一个 [=22= 中的另一个 fread ]
我目前的代码如下:
<?php
function getFiberFromStream($stream, $url): Fiber {
return new Fiber(function ($stream) use ($url): void {
while (!feof($stream)) {
echo "reading 100 bytes from $url".PHP_EOL;
$contents = fread($stream, 100);
Fiber::suspend($contents);
}
});
}
function getContents(array $urls): array {
$contents = [];
foreach ($urls as $key => $url) {
$stream = fopen($url, 'r');
stream_set_blocking($stream, false);
$fiber = getFiberFromStream($stream, $url);
$content = $fiber->start($stream);
while (!$fiber->isTerminated()) {
$content .= $fiber->resume();
}
fclose($stream);
$contents[$urls[$key]] = $content;
}
return $contents;
}
$urls = [
'https://www.google.com/',
'https://www.twitter.com',
'https://www.facebook.com'
];
var_dump(getContents($urls));
不幸的是,getFiberFromStream() 中使用的回显显示当前代码正在等待从 url 获取全部内容以转到下一个:
reading 100 bytes from https://www.google.com
reading 100 bytes from https://www.google.com
reading 100 bytes from https://www.google.com //finished
reading 100 bytes from https://www.twitter.com
reading 100 bytes from https://www.twitter.com
reading 100 bytes from https://www.twitter.com //finished
reading 100 bytes from https://www.facebook.com
[...]
我想要这样的东西:
reading 100 bytes from https://www.google.com
reading 100 bytes from https://www.twitter.com
reading 100 bytes from https://www.facebook.com
reading 100 bytes from https://www.google.com
reading 100 bytes from https://www.twitter.com
reading 100 bytes from https://www.facebook.com
[...]
您看到的行为是因为您轮询当前纤程直到完全完成,然后再转到下一个纤程。
这里的解决方案是同时启动所有 url 的所有纤程,然后才对它们进行轮询。
尝试这样的事情:
function getContents(array $urls): array {
$contents = [];
$fibers = [];
// start them all up
foreach ($urls as $key => $url) {
$stream = fopen($url, 'r');
stream_set_blocking($stream, false);
$fiber = getFiberFromStream($stream, $url);
$content = $fiber->start($stream);
// save fiber context so we can process them later
$fibers[$key] = [$fiber, $content, $stream];
}
// now poll
$have_unterminated_fibers = true;
while ($have_unterminated_fibers) {
// first suppose we have no work to do
$have_unterminated_fibers = false;
// now loop over fibers to see if any is still working
foreach ($fibers as $key => $item) {
// fetch context
$fiber = $item[0];
$content = $item[1];
$stream = $item[2];
// don't do while till the end here,
// just process next chunk
if (!$fiber->isTerminated()) {
// yep, mark we still have some work left
$have_unterminated_fibers = true;
// update content in the context
$content .= $fiber->resume();
$fibers[$key][1] = $content;
} else {
if ($stream) {
fclose($stream);
// save result for return
$contents[$urls[$key]] = $content;
// mark stream as closed in context
// so it don't close twice
$fibers[$key][2] = null;
}
}
}
}
return $contents;
}
我想使用 fread 和 Fibers 从列表中的每个 url 获取内容,其中每个流不需要等待 feof 到 运行 另一个 [=22= 中的另一个 fread ]
我目前的代码如下:
<?php
function getFiberFromStream($stream, $url): Fiber {
return new Fiber(function ($stream) use ($url): void {
while (!feof($stream)) {
echo "reading 100 bytes from $url".PHP_EOL;
$contents = fread($stream, 100);
Fiber::suspend($contents);
}
});
}
function getContents(array $urls): array {
$contents = [];
foreach ($urls as $key => $url) {
$stream = fopen($url, 'r');
stream_set_blocking($stream, false);
$fiber = getFiberFromStream($stream, $url);
$content = $fiber->start($stream);
while (!$fiber->isTerminated()) {
$content .= $fiber->resume();
}
fclose($stream);
$contents[$urls[$key]] = $content;
}
return $contents;
}
$urls = [
'https://www.google.com/',
'https://www.twitter.com',
'https://www.facebook.com'
];
var_dump(getContents($urls));
不幸的是,getFiberFromStream() 中使用的回显显示当前代码正在等待从 url 获取全部内容以转到下一个:
reading 100 bytes from https://www.google.com
reading 100 bytes from https://www.google.com
reading 100 bytes from https://www.google.com //finished
reading 100 bytes from https://www.twitter.com
reading 100 bytes from https://www.twitter.com
reading 100 bytes from https://www.twitter.com //finished
reading 100 bytes from https://www.facebook.com
[...]
我想要这样的东西:
reading 100 bytes from https://www.google.com
reading 100 bytes from https://www.twitter.com
reading 100 bytes from https://www.facebook.com
reading 100 bytes from https://www.google.com
reading 100 bytes from https://www.twitter.com
reading 100 bytes from https://www.facebook.com
[...]
您看到的行为是因为您轮询当前纤程直到完全完成,然后再转到下一个纤程。
这里的解决方案是同时启动所有 url 的所有纤程,然后才对它们进行轮询。
尝试这样的事情:
function getContents(array $urls): array {
$contents = [];
$fibers = [];
// start them all up
foreach ($urls as $key => $url) {
$stream = fopen($url, 'r');
stream_set_blocking($stream, false);
$fiber = getFiberFromStream($stream, $url);
$content = $fiber->start($stream);
// save fiber context so we can process them later
$fibers[$key] = [$fiber, $content, $stream];
}
// now poll
$have_unterminated_fibers = true;
while ($have_unterminated_fibers) {
// first suppose we have no work to do
$have_unterminated_fibers = false;
// now loop over fibers to see if any is still working
foreach ($fibers as $key => $item) {
// fetch context
$fiber = $item[0];
$content = $item[1];
$stream = $item[2];
// don't do while till the end here,
// just process next chunk
if (!$fiber->isTerminated()) {
// yep, mark we still have some work left
$have_unterminated_fibers = true;
// update content in the context
$content .= $fiber->resume();
$fibers[$key][1] = $content;
} else {
if ($stream) {
fclose($stream);
// save result for return
$contents[$urls[$key]] = $content;
// mark stream as closed in context
// so it don't close twice
$fibers[$key][2] = null;
}
}
}
}
return $contents;
}