使用 ReactPHP 在响应流中休眠
Time sleep in reponse Stream with ReactPHP
我正在玩 ReactPHP 和响应流。
我已经成功创建了一个 POC 来生成这样的响应流:
function (int $chunks, int $sleep) use ($loop) {
$stream = new ThroughStream();
$loop->addPeriodicTimer($sleep, function (TimerInterface $timer) use ($stream, $chunks, $loop) {
static $i = 0;
$stream->write(microtime(true) . PHP_EOL);
$i++;
if ($i >= $chunks) {
$loop->cancelTimer($timer);
$stream->end();
}
});
return new Response(200, ['Content-Type' => 'text/plain'], $stream);
}
调用curl -X GET -i https://localhost:9091/sleepstream/10/1
会产生
1624222419.1271
1624222420.1282
1624222421.1293
1624222422.1302
1624222423.1312
1624222424.1323
1624222425.1333
1624222426.1342
1624222427.1353
1624222428.1363
每行在前一行之后打印 1 秒。不错。
现在我正在尝试创建一个更逼真的控制器:
function (int $sleep, ServerRequestInterface $request) use ($loop) {
$body = $request->getBody();
assert($body instanceof \React\Stream\ReadableStreamInterface);
$in = new \Clue\React\NDJson\Decoder($body);
$stream = new ThroughStream();
$in->on('data', function ($data) use ($stream, $loop) {
$data->ts = time();
$loop->futureTick(function () use ($stream, $data) {
echo "DATA\n";
$stream->write(\json_encode($data) . PHP_EOL);
sleep(1);
});
});
$in->on('end', function() use ($stream, $loop) {
$loop->addTimer(2, function () use ($stream) {
$stream->end();
});
});
return new Response(200, ['Content-Type' => 'text/plain'], $stream);
}
我正在使用 NDJSON 输入文件 users.ndjson
:
{"id":1,"name":"Alice"}
{"id":2,"name":"Bob"}
{"id":3,"name":"Carol"}
{"id":4,"name":"David"}
{"id":5,"name":"Zach"}
这么称呼:
curl -D /dev/stderr -s -X PUT -T contrib/users.ndjson https://localhost:9091/bridge/ndjson/1
给出:
{"id":1,"name":"Alice","ts":1624231062}
{"id":2,"name":"Bob","ts":1624231062}
{"id":3,"name":"Carol","ts":1624231062}
{"id":4,"name":"David","ts":1624231062}
{"id":5,"name":"Zach","ts":1624231062}
但响应不是每秒逐行收到,而是一次性收到...
服务器日志显示:
DATA
DATA
DATA
DATA
DATA
把握好时机(每秒 1 个)。
我不明白为什么响应流在第二个控制器中被破坏。
编辑:
我开始明白为什么响应流在第二个控制器中被破坏了。
@WyriHaximus 指出 sleep()
正在阻塞循环。所以我现在的问题是:我如何屈服于记录已准备好进行流式传输并模拟睡眠的循环(我正在尝试模拟输入流和输出流之间的计算滞后以验证某些关键点)。
这里是 ReactPHP 核心维护者。您的代码中的 sleep(1);
会阻止事件循环一整秒。并且由于事件循环无法写出数据,因为它在尝试时被阻塞了。此外,如果您添加 time()
调试调用,您可能希望使用 microtime(true)
来更好地可视化写入排队之间的时间。除非我遗漏了什么并且你有充分的理由,否则你通常不想延迟写出处理后得到的数据,因为它会占用你不需要保留的内存。
我正在玩 ReactPHP 和响应流。
我已经成功创建了一个 POC 来生成这样的响应流:
function (int $chunks, int $sleep) use ($loop) {
$stream = new ThroughStream();
$loop->addPeriodicTimer($sleep, function (TimerInterface $timer) use ($stream, $chunks, $loop) {
static $i = 0;
$stream->write(microtime(true) . PHP_EOL);
$i++;
if ($i >= $chunks) {
$loop->cancelTimer($timer);
$stream->end();
}
});
return new Response(200, ['Content-Type' => 'text/plain'], $stream);
}
调用curl -X GET -i https://localhost:9091/sleepstream/10/1
会产生
1624222419.1271
1624222420.1282
1624222421.1293
1624222422.1302
1624222423.1312
1624222424.1323
1624222425.1333
1624222426.1342
1624222427.1353
1624222428.1363
每行在前一行之后打印 1 秒。不错。
现在我正在尝试创建一个更逼真的控制器:
function (int $sleep, ServerRequestInterface $request) use ($loop) {
$body = $request->getBody();
assert($body instanceof \React\Stream\ReadableStreamInterface);
$in = new \Clue\React\NDJson\Decoder($body);
$stream = new ThroughStream();
$in->on('data', function ($data) use ($stream, $loop) {
$data->ts = time();
$loop->futureTick(function () use ($stream, $data) {
echo "DATA\n";
$stream->write(\json_encode($data) . PHP_EOL);
sleep(1);
});
});
$in->on('end', function() use ($stream, $loop) {
$loop->addTimer(2, function () use ($stream) {
$stream->end();
});
});
return new Response(200, ['Content-Type' => 'text/plain'], $stream);
}
我正在使用 NDJSON 输入文件 users.ndjson
:
{"id":1,"name":"Alice"}
{"id":2,"name":"Bob"}
{"id":3,"name":"Carol"}
{"id":4,"name":"David"}
{"id":5,"name":"Zach"}
这么称呼:
curl -D /dev/stderr -s -X PUT -T contrib/users.ndjson https://localhost:9091/bridge/ndjson/1
给出:
{"id":1,"name":"Alice","ts":1624231062}
{"id":2,"name":"Bob","ts":1624231062}
{"id":3,"name":"Carol","ts":1624231062}
{"id":4,"name":"David","ts":1624231062}
{"id":5,"name":"Zach","ts":1624231062}
但响应不是每秒逐行收到,而是一次性收到...
服务器日志显示:
DATA
DATA
DATA
DATA
DATA
把握好时机(每秒 1 个)。
我不明白为什么响应流在第二个控制器中被破坏。
编辑: 我开始明白为什么响应流在第二个控制器中被破坏了。
@WyriHaximus 指出 sleep()
正在阻塞循环。所以我现在的问题是:我如何屈服于记录已准备好进行流式传输并模拟睡眠的循环(我正在尝试模拟输入流和输出流之间的计算滞后以验证某些关键点)。
这里是 ReactPHP 核心维护者。您的代码中的 sleep(1);
会阻止事件循环一整秒。并且由于事件循环无法写出数据,因为它在尝试时被阻塞了。此外,如果您添加 time()
调试调用,您可能希望使用 microtime(true)
来更好地可视化写入排队之间的时间。除非我遗漏了什么并且你有充分的理由,否则你通常不想延迟写出处理后得到的数据,因为它会占用你不需要保留的内存。