如何从 'on data' 事件中 'pipe' oracle-db 数据

How to 'pipe' oracle-db data from 'on data' event

我已经使用 node-oracledb 几个月了,我已经设法实现了到目前为止我需要的东西。

我目前正在开发一款搜索应用,一次调用可能 return 大约 200 万行数据。为确保我不会与浏览器和服务器断开连接,我想我会尝试使用 queryStream,以便有源源不断的数据流返回客户端。

我按原样实现了 queryStream 示例,这对几十万行来说效果很好。但是,当 returned 行大于一百万时,Node 会耗尽内存。通过记录和观察客户端和服务器日志事件,我可以看到客户端在发送和接收的行数方面远远落后于服务器。所以,看起来 Node 正在崩溃,因为它缓冲了太多数据。

值得注意的是,此时,我的 selectstream 实现在通过 Express 调用的 req/res 函数中。

为了 return 数据,我做了类似....

stream.on('data', function (data) {

    rowcount++;

    let obj = new myObjectConstructor(data);
    res.write(JSON.stringify(obj.getJson());

});

我一直在阅读有关流和管道如何帮助流动的信息,所以我希望能够将查询的结果通过管道传输到 a) 帮助流动和 b ) 以便能够在将结果发送回客户端之前将结果通过管道传递给其他函数。

例如

function getData(req, res){

    var stream = myQueryStream(connection, query);

    stream
        .pipe(toSomeOtherFunction)
        .pipe(yetAnotherFunction)
        .pipe(res);

}

我花了几个小时试图找到一个解决方案或示例来让我通过管道传输结果,但我被卡住了,需要一些帮助。

抱歉,如果我遗漏了一些明显的东西,但我仍在掌握 Node,尤其是流。

提前致谢。

这里有点阻抗不匹配。 queryStream API 发出 JavaScript 行对象,但您想要流式传输到客户端的是一个 JSON 数组。您基本上必须在开头添加一个左括号,在每行之后添加一个逗号,并在末尾添加一个右括号。

我将向您展示如何在直接使用驱动程序的控制器中执行此操作,而不是像我在 this series 中提倡的那样使用单独的数据库模块。

const oracledb = require('oracledb');

async function get(req, res, next) {
  try {
    const conn = await oracledb.getConnection();

    const stream = await conn.queryStream('select * from employees', [], {outFormat: oracledb.OBJECT});

    res.writeHead(200, {'Content-Type': 'application/json'});

    res.write('[');

    stream.on('data', (row) => {
      res.write(JSON.stringify(row));
      res.write(',');
    });

    stream.on('end', () => {
      res.end(']');
    });

    stream.on('close', async () => {
      try {
        await conn.close();
      } catch (err) {
        console.log(err);
      }
    });

    stream.on('error', async (err) => {
      next(err);

      try {
        await conn.close();
      } catch (err) {
        console.log(err);
      }
    });
  } catch (err) {
    next(err);
  }
}

module.exports.get = get;

一旦你掌握了概念,你就可以使用可重用的 Transform class 来简化一些事情,它允许你在控制器逻辑中使用管道:

const oracledb = require('oracledb');
const { Transform } = require('stream');

class ToJSONArray extends Transform {
  constructor() {
    super({objectMode: true});

    this.push('[');
  }

  _transform (row, encoding, callback) {
    if (this._prevRow) {
      this.push(JSON.stringify(this._prevRow));
      this.push(',');
    }

    this._prevRow = row;

    callback(null);
  }

  _flush (done) {
    if (this._prevRow) {
      this.push(JSON.stringify(this._prevRow));
    }

    this.push(']');

    delete this._prevRow;

    done();
  }
}

async function get(req, res, next) {
  try {
    const toJSONArray = new ToJSONArray();
    const conn = await oracledb.getConnection();

    const stream = await conn.queryStream('select * from employees', [], {outFormat: oracledb.OBJECT});

    res.writeHead(200, {'Content-Type': 'application/json'});

    stream.pipe(toJSONArray).pipe(res);

    stream.on('close', async () => {
      try {
        await conn.close();
      } catch (err) {
        console.log(err);
      }
    });

    stream.on('error', async (err) => {
      next(err);

      try {
        await conn.close();
      } catch (err) {
        console.log(err);
      }
    });
  } catch (err) {
    next(err);
  }
}

module.exports.get = get;

无需编写自己的逻辑来创建 JSON 流,您可以使用 JSONStream 将对象流转换为(字符串化)JSON,然后再将其通过管道传输到目标位置(resprocess.stdout 等)这样就无需处理 .on('data',...) 事件。

在下面的示例中,我使用了 pipeline from node's stream module rather than the .pipe method: the effect is similar (with better error handling I think). To get objects from oracledb.queryStream, you can specify option {outFormat: oracledb.OUT_FORMAT_OBJECT} (docs). Then you can make arbitrary modifications to the stream of objects produced. This can be done using a transform stream, made perhaps using through2-map, or if you need to drop or split rows, through2。下面的流在被字符串化为 JSON 后被发送到 process.stdout,但你同样可以发送到它 express 的 res.

require('dotenv').config()   // config from .env file
const JSONStream = require('JSONStream')
const oracledb = require('oracledb')
const { pipeline } = require('stream')  
const map = require('through2-map')   // see https://www.npmjs.com/package/through2-map

oracledb.getConnection({
  user: process.env.DB_USER,
  password: process.env.DB_PASSWORD,
  connectString: process.env.CONNECT_STRING
}).then(connection => {
  pipeline(
    connection.queryStream(`
      select dual.*,'test' as col1 from dual 
      union select dual.*, :someboundvalue as col1 from dual 
      `
      ,{"someboundvalue":"test5"} // binds
      ,{
        prefetchRows: 150, // for tuning
        fetchArraySize: 150, // for tuning
        outFormat: oracledb.OUT_FORMAT_OBJECT
      }
    )
    ,map.obj((row,index) => {
      row.arbitraryModification = index 
      return row
    })
    ,JSONStream.stringify() // false gives ndjson
    ,process.stdout     // or send to express's res
    ,(err) => { if(err) console.error(err) }
  )
})

// [
// {"DUMMY":"X","COL1":"test","arbitraryModification":0}
// ,
// {"DUMMY":"X","COL1":"test5","arbitraryModification":1}
// ]