使用 ReactPHP 在响应流中休眠

Time sleep in reponse Stream with ReactPHP

我正在玩 ReactPHP 和响应流。

我已经成功创建了一个 POC 来生成这样的响应流:

function (int $chunks, int $sleep) use ($loop) {
    $stream = new ThroughStream();
    $loop->addPeriodicTimer($sleep, function (TimerInterface $timer) use ($stream, $chunks, $loop) {
        static $i = 0;
        $stream->write(microtime(true) . PHP_EOL);
        $i++;
        if ($i >= $chunks) {
            $loop->cancelTimer($timer);
            $stream->end();
        }
    });
    return new Response(200, ['Content-Type' => 'text/plain'], $stream);
}

调用curl -X GET -i https://localhost:9091/sleepstream/10/1会产生

1624222419.1271
1624222420.1282
1624222421.1293
1624222422.1302
1624222423.1312
1624222424.1323
1624222425.1333
1624222426.1342
1624222427.1353
1624222428.1363

每行在前一行之后打印 1 秒。不错。

现在我正在尝试创建一个更逼真的控制器:

function (int $sleep, ServerRequestInterface $request) use ($loop) {
    $body = $request->getBody();
    assert($body instanceof \React\Stream\ReadableStreamInterface);
                    
    $in = new \Clue\React\NDJson\Decoder($body);
                    
    $stream = new ThroughStream();

    $in->on('data', function ($data) use ($stream, $loop) {
        $data->ts = time();
        $loop->futureTick(function () use ($stream, $data) {
            echo "DATA\n";
            $stream->write(\json_encode($data) . PHP_EOL);
            sleep(1);
        });
    });
                    
    $in->on('end', function() use ($stream, $loop) {
        $loop->addTimer(2, function () use ($stream) {
            $stream->end();
        });
    });
    return new Response(200, ['Content-Type' => 'text/plain'], $stream);
}

我正在使用 NDJSON 输入文件 users.ndjson:

{"id":1,"name":"Alice"}
{"id":2,"name":"Bob"}
{"id":3,"name":"Carol"}
{"id":4,"name":"David"}
{"id":5,"name":"Zach"}

这么称呼:

curl -D /dev/stderr -s -X PUT -T contrib/users.ndjson https://localhost:9091/bridge/ndjson/1

给出:

{"id":1,"name":"Alice","ts":1624231062}
{"id":2,"name":"Bob","ts":1624231062}
{"id":3,"name":"Carol","ts":1624231062}
{"id":4,"name":"David","ts":1624231062}
{"id":5,"name":"Zach","ts":1624231062}

但响应不是每秒逐行收到,而是一次性收到...

服务器日志显示:

DATA
DATA
DATA
DATA
DATA

把握好时机(每秒 1 个)。

我不明白为什么响应流在第二个控制器中被破坏。

编辑: 我开始明白为什么响应流在第二个控制器中被破坏了。

@WyriHaximus 指出 sleep() 正在阻塞循环。所以我现在的问题是:我如何屈服于记录已准备好进行流式传输并模拟睡眠的循环(我正在尝试模拟输入流和输出流之间的计算滞后以验证某些关键点)。

这里是 ReactPHP 核心维护者。您的代码中的 sleep(1); 会阻止事件循环一整秒。并且由于事件循环无法写出数据,因为它在尝试时被阻塞了。此外,如果您添加 time() 调试调用,您可能希望使用 microtime(true) 来更好地可视化写入排队之间的时间。除非我遗漏了什么并且你有充分的理由,否则你通常不想延迟写出处理后得到的数据,因为它会占用你不需要保留的内存。