使用 Fetch Streams API 在不使用递归的情况下异步使用分块数据
Use Fetch Streams API to consume chunked data asynchronously without using recursion
我正在使用 JavaScript fetch
streams API to consume chunked JSON asynchronously like in this answer。
我的应用程序可能在一个小时内每秒接收多达 25 个 JSON 小对象(视频中的每一帧一个对象)。
当传入的块很大时(每个块 1000+ JSON 个对象),我的代码运行良好 - 速度快,内存使用最少 - 它可以轻松可靠地接收 1,000,000 JSON 个对象。
当传入的块较小时(每个块 5 JSON 个对象),我的代码运行不佳 - 速度慢,内存消耗大。浏览器死于大约 50,000 个 JSON 个对象。
在 Developer 工具中进行了大量调试后,问题似乎出在代码的递归性质上。
我试图删除递归,但这似乎是必需的,因为 API 依赖于我的代码返回对链的承诺?!
如何删除这个递归,或者我应该使用 fetch
以外的东西?
递归代码(有效)
String.prototype.replaceAll = function(search, replacement) {
var target = this;
return target.replace(new RegExp(search, 'g'), replacement);
};
results = []
fetch('http://localhost:9999/').then(response => {
const reader = response.body.getReader();
td = new TextDecoder("utf-8");
buffer = "";
reader.read().then(function processText({ done, value }) {
if (done) {
console.log("Stream done.");
return;
}
try {
decoded = td.decode(value);
buffer += decoded;
if (decoded.length != 65536){
toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
result = JSON.parse(toParse);
results.push(...result);
console.log("Received " + results.length.toString() + " objects")
buffer = "";
}
}
catch(e){
// Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
//console.log("EXCEPTION:"+e);
}
return reader.read().then(processText);
})
});
没有递归的代码(不起作用)
String.prototype.replaceAll = function(search, replacement) {
var target = this;
return target.replace(new RegExp(search, 'g'), replacement);
};
results = []
finished = false
fetch('http://localhost:9999/').then(response => {
const reader = response.body.getReader();
td = new TextDecoder("utf-8");
buffer = "";
lastResultSize = -1
while (!finished)
if (lastResultSize < results.length)
{
lastResultSize = results.length;
reader.read().then(function processText({ done, value }) {
if (done) {
console.log("Stream done.");
finished = true;
return;
}
else
try {
decoded = td.decode(value);
//console.log("Received chunk " + decoded.length.toString() + " in length");
buffer += decoded;
if (decoded.length != 65536){
toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
result = JSON.parse(toParse);
results.push(...result);
console.log("Received " + results.length.toString() + " objects")
buffer = "";
//console.log("Parsed chunk " + toParse.length.toString() + " in length");
}
}
catch(e) {
// Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
//console.log("EXCEPTION:"+e);
}
})
}
});
为了完整起见,这里是我在测试服务器上使用的 python 代码。请注意包含 sleep 的行,它改变了分块行为:
import io
import urllib
import inspect
from http.server import HTTPServer,BaseHTTPRequestHandler
from time import sleep
class TestServer(BaseHTTPRequestHandler):
def do_GET(self):
args = urllib.parse.parse_qs(self.path[2:])
args = {i:args[i][0] for i in args}
response = ''
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Transfer-Encoding', 'chunked')
self.end_headers()
for i in range (1000000):
self.wfile.write(bytes(f'{{"x":{i}, "text":"fred!"}}\n','utf-8'))
sleep(0.001) # Comment this out for bigger chunks sent to the client!
def main(server_port:"Port to serve on."=9999,server_address:"Local server name."=''):
httpd = HTTPServer((server_address, server_port), TestServer)
print(f'Serving on http://{httpd.server_name}:{httpd.server_port} ...')
httpd.serve_forever()
if __name__ == '__main__':
main()
您缺少的部分是传递给 .then()
的函数总是异步调用的,即使用空堆栈。所以这里没有实际的递归。这也是为什么您的 'without recursion' 版本不起作用的原因。
对此的简单解决方案是使用异步函数和 await 语句。如果你这样调用 read() :
const {value, done} = await reader.read();
...然后您可以在循环中调用它,它将按您期望的方式工作。
我不知道你的具体内存泄漏在哪里,但是你对全局变量的使用看起来有问题。我建议您始终将 'use strict';
放在代码的顶部,这样编译器会为您捕获这些问题。然后在声明变量时使用 let
或 const
。
我建议您使用 TextDecoderStream 以避免在多个块之间拆分字符时出现问题。当 JSON 对象在多个块之间拆分时,您也会遇到问题。
请参阅 Append child writable stream demo 了解如何安全地执行此操作(但请注意,您需要 TextDecoderStream,该演示具有 "TextDecoder")。
另请注意该演示中对 WritableStream 的使用。 Firefox 尚不支持 AFAIK,但 WritableStream 提供了更简单的语法来使用块,而无需显式循环或递归。您可以找到网络流 polyfill here.
我正在使用 JavaScript fetch
streams API to consume chunked JSON asynchronously like in this answer。
我的应用程序可能在一个小时内每秒接收多达 25 个 JSON 小对象(视频中的每一帧一个对象)。
当传入的块很大时(每个块 1000+ JSON 个对象),我的代码运行良好 - 速度快,内存使用最少 - 它可以轻松可靠地接收 1,000,000 JSON 个对象。
当传入的块较小时(每个块 5 JSON 个对象),我的代码运行不佳 - 速度慢,内存消耗大。浏览器死于大约 50,000 个 JSON 个对象。
在 Developer 工具中进行了大量调试后,问题似乎出在代码的递归性质上。
我试图删除递归,但这似乎是必需的,因为 API 依赖于我的代码返回对链的承诺?!
如何删除这个递归,或者我应该使用 fetch
以外的东西?
递归代码(有效)
String.prototype.replaceAll = function(search, replacement) {
var target = this;
return target.replace(new RegExp(search, 'g'), replacement);
};
results = []
fetch('http://localhost:9999/').then(response => {
const reader = response.body.getReader();
td = new TextDecoder("utf-8");
buffer = "";
reader.read().then(function processText({ done, value }) {
if (done) {
console.log("Stream done.");
return;
}
try {
decoded = td.decode(value);
buffer += decoded;
if (decoded.length != 65536){
toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
result = JSON.parse(toParse);
results.push(...result);
console.log("Received " + results.length.toString() + " objects")
buffer = "";
}
}
catch(e){
// Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
//console.log("EXCEPTION:"+e);
}
return reader.read().then(processText);
})
});
没有递归的代码(不起作用)
String.prototype.replaceAll = function(search, replacement) {
var target = this;
return target.replace(new RegExp(search, 'g'), replacement);
};
results = []
finished = false
fetch('http://localhost:9999/').then(response => {
const reader = response.body.getReader();
td = new TextDecoder("utf-8");
buffer = "";
lastResultSize = -1
while (!finished)
if (lastResultSize < results.length)
{
lastResultSize = results.length;
reader.read().then(function processText({ done, value }) {
if (done) {
console.log("Stream done.");
finished = true;
return;
}
else
try {
decoded = td.decode(value);
//console.log("Received chunk " + decoded.length.toString() + " in length");
buffer += decoded;
if (decoded.length != 65536){
toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
result = JSON.parse(toParse);
results.push(...result);
console.log("Received " + results.length.toString() + " objects")
buffer = "";
//console.log("Parsed chunk " + toParse.length.toString() + " in length");
}
}
catch(e) {
// Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
//console.log("EXCEPTION:"+e);
}
})
}
});
为了完整起见,这里是我在测试服务器上使用的 python 代码。请注意包含 sleep 的行,它改变了分块行为:
import io
import urllib
import inspect
from http.server import HTTPServer,BaseHTTPRequestHandler
from time import sleep
class TestServer(BaseHTTPRequestHandler):
def do_GET(self):
args = urllib.parse.parse_qs(self.path[2:])
args = {i:args[i][0] for i in args}
response = ''
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Transfer-Encoding', 'chunked')
self.end_headers()
for i in range (1000000):
self.wfile.write(bytes(f'{{"x":{i}, "text":"fred!"}}\n','utf-8'))
sleep(0.001) # Comment this out for bigger chunks sent to the client!
def main(server_port:"Port to serve on."=9999,server_address:"Local server name."=''):
httpd = HTTPServer((server_address, server_port), TestServer)
print(f'Serving on http://{httpd.server_name}:{httpd.server_port} ...')
httpd.serve_forever()
if __name__ == '__main__':
main()
您缺少的部分是传递给 .then()
的函数总是异步调用的,即使用空堆栈。所以这里没有实际的递归。这也是为什么您的 'without recursion' 版本不起作用的原因。
对此的简单解决方案是使用异步函数和 await 语句。如果你这样调用 read() :
const {value, done} = await reader.read();
...然后您可以在循环中调用它,它将按您期望的方式工作。
我不知道你的具体内存泄漏在哪里,但是你对全局变量的使用看起来有问题。我建议您始终将 'use strict';
放在代码的顶部,这样编译器会为您捕获这些问题。然后在声明变量时使用 let
或 const
。
我建议您使用 TextDecoderStream 以避免在多个块之间拆分字符时出现问题。当 JSON 对象在多个块之间拆分时,您也会遇到问题。
请参阅 Append child writable stream demo 了解如何安全地执行此操作(但请注意,您需要 TextDecoderStream,该演示具有 "TextDecoder")。
另请注意该演示中对 WritableStream 的使用。 Firefox 尚不支持 AFAIK,但 WritableStream 提供了更简单的语法来使用块,而无需显式循环或递归。您可以找到网络流 polyfill here.