如何在 PHP 中对 Fibers 使用多个异步 fread?

How to use multiple async fread with Fibers in PHP?

我想使用 fread 和 Fibers 从列表中的每个 url 获取内容,其中每个流不需要等待 feof 到 运行 另一个 [=22= 中的另一个 fread ]

我目前的代码如下:

<?php

function getFiberFromStream($stream, $url): Fiber {
    
    return new Fiber(function ($stream) use ($url): void {
                while (!feof($stream)) {
                    echo "reading 100 bytes from $url".PHP_EOL;
                    $contents = fread($stream, 100);
                    Fiber::suspend($contents);
                }
            });
}

function getContents(array $urls): array {

    $contents = [];

    foreach ($urls as $key => $url) {

        $stream = fopen($url, 'r');
        stream_set_blocking($stream, false);
        $fiber = getFiberFromStream($stream, $url);
        $content = $fiber->start($stream);

        while (!$fiber->isTerminated()) {
            $content .= $fiber->resume();
        }
        fclose($stream);

        $contents[$urls[$key]] = $content;
    }

    return $contents;
}

$urls = [
    'https://www.google.com/',
    'https://www.twitter.com',
    'https://www.facebook.com'
];

var_dump(getContents($urls));

不幸的是,getFiberFromStream() 中使用的回显显示当前代码正在等待从 url 获取全部内容以转到下一个:

reading 100 bytes from https://www.google.com
reading 100 bytes from https://www.google.com
reading 100 bytes from https://www.google.com //finished
reading 100 bytes from https://www.twitter.com
reading 100 bytes from https://www.twitter.com
reading 100 bytes from https://www.twitter.com //finished
reading 100 bytes from https://www.facebook.com
[...]

我想要这样的东西:

reading 100 bytes from https://www.google.com
reading 100 bytes from https://www.twitter.com
reading 100 bytes from https://www.facebook.com
reading 100 bytes from https://www.google.com
reading 100 bytes from https://www.twitter.com
reading 100 bytes from https://www.facebook.com
[...]

您看到的行为是因为您轮询当前纤程直到完全完成,然后再转到下一个纤程。

这里的解决方案是同时启动所有 url 的所有纤程,然后才对它们进行轮询。

尝试这样的事情:


function getContents(array $urls): array {

    $contents = [];
    $fibers = [];

    // start them all up
    foreach ($urls as $key => $url) {

        $stream = fopen($url, 'r');
        stream_set_blocking($stream, false);
        $fiber = getFiberFromStream($stream, $url);
        $content = $fiber->start($stream);

        // save fiber context so we can process them later
        $fibers[$key] = [$fiber, $content, $stream];
    }

    // now poll
    $have_unterminated_fibers = true;
    while ($have_unterminated_fibers) {

        // first suppose we have no work to do
        $have_unterminated_fibers = false;

        // now loop over fibers to see if any is still working
        foreach ($fibers as $key => $item) {
            // fetch context
            $fiber = $item[0]; 
            $content = $item[1]; 
            $stream = $item[2];

            // don't do while till the end here, 
            // just process next chunk
            if (!$fiber->isTerminated()) {
                // yep, mark we still have some work left
                $have_unterminated_fibers = true;

                // update content in the context
                $content .= $fiber->resume();
                $fibers[$key][1] = $content;
            } else {
                if ($stream) {
                    fclose($stream);

                    // save result for return
                    $contents[$urls[$key]] = $content;

                    // mark stream as closed in context 
                    // so it don't close twice
                    $fibers[$key][2] = null;
                }
            }
        }
    }

    return $contents;
}