NodeJs 流、管道和 https 帖子
NodeJs Streams, pipelines, and https posts
我需要进行一些完整性检查。
我是运行节点11.10.1
我有一个使用 nodejs oracledb 库从 oracle 数据库读取的进程。有一个流式传输功能,我执行 select * 并以 10k 个对象的批次流式传输结果。然后我 post 通过 https 将此数据发送到索引器。对象流被注入管道函数。
我已经使用他跟随代码一段时间了。我正在尝试调试吞吐量。有时我每秒可以看到大约 2k 个文档正在通过此管道处理。大多数时候我看到<150。在我开始调试我的索引服务器之前。我想确保这些功能编码正确。
async function streamReindex(databaseStream) {
let pipeline = util.promisify(stream.pipeline)
await pipeline(
selectStream,// "oracledb": "^4.0.0", stream function
camelize.camelizeStream(), //"camelize2": "^1.0.0", library wrapped in ,"through2": "^3.0.1" library to make it an object stream
JSONStream.stringify(), //"JSONStream": "^1.3.5"
reindexClient.streamReindex(core)
)
}
// reindexClient code.
function streamReindex(core) {
const updateUrl = baseUrl + core + '/update'
const options = {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
'auth': `${user.username}:${user.password}`,
}
let postStream = https.request(updateUrl, options, (res) => {
let response = {
status: {
code: res.statusCode,
message: res.statusMessage
},
headers: res.headers,
}
if (res.statusCode !== 200) {
postStream.destroy(new Error(JSON.stringify(response)))
}
})
postStream.on('error', (err)=>{
throw new Error(err)
})
postStream.on('socket', (socket) => {
socket.setKeepAlive(true, 240000)
})
return postStream
}
async function selectStream(sql, bindings = [], fetchSize =
fetchArraySize) {
let connection = await knex.client.acquireConnection()
log.info(`Fetch size is set to ${fetchSize}`)
let select = connection.queryStream(sql, bindings, {
fetchArraySize: fetchSize,
outFormat: outFormat
})
select.on('error', (err) => {
log.error('Oracle Error Event', err)
knex.client.releaseConnection(connection)
})
select.on('close', () => {
log.info('Oracle Close Event')
knex.client.releaseConnection(connection)
select = null
connection = null
})
return select
}
如果我从管道中删除 reindexClient.streamReindex(core) 函数。我看到每秒约 5k 个对象的吞吐量。我正在研究流的 highwatermark 功能,但似乎无法弄清楚如何在 postStream 上应用它。如果我 console.log post 流式传输它也不会说它处于对象模式。这意味着它的高水位线以字节为单位,我相信它的阈值很低。
如果您需要更多信息,我会尽量提供。
虽然您的问题似乎与 oracledb 无关,但我将其放在此处以便我可以格式化代码。您可能会通过调整 oracledb 流获得一些性能优势,例如:
diff --git a/lib/queryStream.js b/lib/queryStream.js
index 08ddc720..11953e4b 100644
--- a/lib/queryStream.js
+++ b/lib/queryStream.js
@@ -24,7 +24,7 @@ const { Readable } = require('stream');
class QueryStream extends Readable {
constructor(rs) {
- super({ objectMode: true });
+ super({ objectMode: true, highWaterMark: 64 }); // choose your own value
this._fetching = false;
this._numRows = 0;
欢迎将高水位线设置为 queryStream()
选项的 PR。
因此,我通过自己实现 JSONStream.stringify() 函数,将吞吐量提高了近 500/docs/秒。因为它不允许我设置水印。一旦我这样做了,我就能够真正提高它,但我可以使用我所有的记忆。使用以下代码进行设置使我能够长期稳定地占用内存,并具有更好的吞吐量。我还摆脱了 through2 库,并将 camelize 功能放在我的 stringify Transfrom 中。大部分代码和解释都可以在这里找到:https://blog.dmcquay.com/2017/09/06/node-stream-db-results-with-transform.html
代码如下:
function camelizeAndStringify() {
let first = true
const serialize = new Transform({
objectMode: true,
highWaterMark: 1000,
transform(chunk, encoding, callback) {
if (first) {
this.push('[' + JSON.stringify(camelize(chunk)))
first = false
} else {
this.push(',' + JSON.stringify(camelize(chunk)))
}
callback()
chunk = null
},
flush(callback) {
this.push(']')
callback()
}
})
return serialize
}
我需要进行一些完整性检查。
我是运行节点11.10.1
我有一个使用 nodejs oracledb 库从 oracle 数据库读取的进程。有一个流式传输功能,我执行 select * 并以 10k 个对象的批次流式传输结果。然后我 post 通过 https 将此数据发送到索引器。对象流被注入管道函数。
我已经使用他跟随代码一段时间了。我正在尝试调试吞吐量。有时我每秒可以看到大约 2k 个文档正在通过此管道处理。大多数时候我看到<150。在我开始调试我的索引服务器之前。我想确保这些功能编码正确。
async function streamReindex(databaseStream) {
let pipeline = util.promisify(stream.pipeline)
await pipeline(
selectStream,// "oracledb": "^4.0.0", stream function
camelize.camelizeStream(), //"camelize2": "^1.0.0", library wrapped in ,"through2": "^3.0.1" library to make it an object stream
JSONStream.stringify(), //"JSONStream": "^1.3.5"
reindexClient.streamReindex(core)
)
}
// reindexClient code.
function streamReindex(core) {
const updateUrl = baseUrl + core + '/update'
const options = {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
'auth': `${user.username}:${user.password}`,
}
let postStream = https.request(updateUrl, options, (res) => {
let response = {
status: {
code: res.statusCode,
message: res.statusMessage
},
headers: res.headers,
}
if (res.statusCode !== 200) {
postStream.destroy(new Error(JSON.stringify(response)))
}
})
postStream.on('error', (err)=>{
throw new Error(err)
})
postStream.on('socket', (socket) => {
socket.setKeepAlive(true, 240000)
})
return postStream
}
async function selectStream(sql, bindings = [], fetchSize =
fetchArraySize) {
let connection = await knex.client.acquireConnection()
log.info(`Fetch size is set to ${fetchSize}`)
let select = connection.queryStream(sql, bindings, {
fetchArraySize: fetchSize,
outFormat: outFormat
})
select.on('error', (err) => {
log.error('Oracle Error Event', err)
knex.client.releaseConnection(connection)
})
select.on('close', () => {
log.info('Oracle Close Event')
knex.client.releaseConnection(connection)
select = null
connection = null
})
return select
}
如果我从管道中删除 reindexClient.streamReindex(core) 函数。我看到每秒约 5k 个对象的吞吐量。我正在研究流的 highwatermark 功能,但似乎无法弄清楚如何在 postStream 上应用它。如果我 console.log post 流式传输它也不会说它处于对象模式。这意味着它的高水位线以字节为单位,我相信它的阈值很低。
如果您需要更多信息,我会尽量提供。
虽然您的问题似乎与 oracledb 无关,但我将其放在此处以便我可以格式化代码。您可能会通过调整 oracledb 流获得一些性能优势,例如:
diff --git a/lib/queryStream.js b/lib/queryStream.js
index 08ddc720..11953e4b 100644
--- a/lib/queryStream.js
+++ b/lib/queryStream.js
@@ -24,7 +24,7 @@ const { Readable } = require('stream');
class QueryStream extends Readable {
constructor(rs) {
- super({ objectMode: true });
+ super({ objectMode: true, highWaterMark: 64 }); // choose your own value
this._fetching = false;
this._numRows = 0;
欢迎将高水位线设置为 queryStream()
选项的 PR。
因此,我通过自己实现 JSONStream.stringify() 函数,将吞吐量提高了近 500/docs/秒。因为它不允许我设置水印。一旦我这样做了,我就能够真正提高它,但我可以使用我所有的记忆。使用以下代码进行设置使我能够长期稳定地占用内存,并具有更好的吞吐量。我还摆脱了 through2 库,并将 camelize 功能放在我的 stringify Transfrom 中。大部分代码和解释都可以在这里找到:https://blog.dmcquay.com/2017/09/06/node-stream-db-results-with-transform.html
代码如下:
function camelizeAndStringify() {
let first = true
const serialize = new Transform({
objectMode: true,
highWaterMark: 1000,
transform(chunk, encoding, callback) {
if (first) {
this.push('[' + JSON.stringify(camelize(chunk)))
first = false
} else {
this.push(',' + JSON.stringify(camelize(chunk)))
}
callback()
chunk = null
},
flush(callback) {
this.push(']')
callback()
}
})
return serialize
}