我如何使用 RxJS observables 实现数据库 connection/select/close

How do I implement a database connection/select/close using RxJS observables

我正在使用 node-oracledb 连接到 Oracle 数据库。 API 提供了自己的承诺,可以转换为 Promise<T>,因此 "converted" 转换为 Observable<T>

使用 Observables,我想:

  1. 打开数据库连接
  2. Select N条记录
  3. 关闭数据库连接,即使#2 抛出异常

使用传统的、阻塞的、程序化的方式,它会是这样的:

try
{
    connection = Oracle.getConnection(...);
    resultSet = connection.execute("SELECT ... FROM ...");
}
catch (Exception)
{
    resultSet = EMPTY_RESULT;
}
finally
{
    if (connection)
        connection.close();
}

我尝试使用 Observables 编写此代码导致大量代码和回调。

受保护的方法 getConnection() 仍然非常简单:

import * as Oracle from "oracledb";

protected getConnection() : Observable<IConnection>
{
    return OraUtil.from(Oracle.getConnection(this.parameters));
}

closeConnection() 方法也是如此。我在这里直接使用了 promise,以避免更多代码。

protected closeConnection(subscriber : Subscriber<IExecuteReturn>, connection : IConnection) : void
{
    connection.close()
        .then(() => subscriber.complete())
        .catch((error) => subscriber.error());
}

但是 execute() 方法是麻烦的开始。

protected _execute(connection : IConnection, statement : string) : Observable<IExecuteReturn>
{
    return new Observable<IExecuteReturn>(
        (subscriber) => {
            OraUtil.from(connection.execute(statement)).subscribe(
                (result) => subscriber.next(result),
                (error) => {
                    subscriber.error(error);
                    this.closeConnection(subscriber, connection);
                },
                () => {
                    this.closeConnection(subscriber, connection);
                });
        });
}

public execute(statement : string) : Observable<IExecuteReturn>
{
    return this.getConnection().pipe(
        flatMap((connection) => this._execute(connection, statement))
    );
}

一般来说,当使用 RxJs 6.x 时,类似 "connect + execute + close" 序列的实现是沿着这些线

let connection;

Oracle.getConnection(....)
.pipe(
  switchMap(conn => {
    connection = conn;
    return connection.execute(statement);
  }),
  finalize(() => connection.close())
)
.subscribe(
  result => resultSet = result,
  error => {
    console.error(error);
  }
)

语法细节可能不同,但关键思想是一旦连接 Observable 发出,然后你 switchMap 执行语句。当您订阅时,整个链就会被激活。在订阅中,您可以在出现错误或执行返回的 Observable 完成时关闭连接。

这就是我通常处理连接管理的方式。核心是 using observable creator,它接受资源工厂作为第一个参数,设置函数作为第二个参数。

using(() => { unsubscribe() }, resource => observableOf(resource))

resource 是一个具有 unsubscribe 方法的对象,作为取消订阅的一部分被调用 - 因此您可以在那里隐藏任何逻辑,并有效地将任意对象的生命周期绑定到可观察对象的生命周期。

我希望下面的代码有意义。

import * as Oracle from "oracledb";
import { mergeMap , ignoreElements} from 'rxjs/operators';
import { using } from 'rxjs/observable/using';
import { from as observableFrom } from 'rxjs/observable/from';
import { concat } from 'rxjs/observable/concat';
import { defer } from 'rxjs/observable/defer';
import { empty as observableEmpty } from 'rxjs/observable/empty';

class OracleConnection {
  constructor(parameters) {
    this.isClosed = false;
    this.connection = null;
    this.parameters = parameters;
  }

  setup() {
    return defer(() => Oracle.getConnection(this.parameters)
      .then(connection => { // do this in promise in case observable gets unsubscribed before connection is established
        this.connection = connection;
        if (this.isClosed) { // close in case connection got already closed before even got established
          this.terminate();
        }
        return connection;
      }));
  }

  close() {
    this.isClosed = true;
    if (this.connection !== null) {
      const connection = this.connection;
      this.connection = null;

      return observableFrom(connection.close())
        .pipe(ignoreElements()) // only propagate errors
    }

    return observableEmpty(); // connection already closed
  }
  
  terminate() {
    this.close().subscribe(/* handle error from connection close */);
  }

  unsubscribe() { // this will get called on observable unsubscribe
    if (!this.isClosed) {
      this.terminate();
    }
  }
}

class ConnectionManager {
  constructor(params) {
    this.params = params;
  }

  getConnection() {
    return using(() => new OracleConnection(this.params), oracleConnection => oracleConnection.setup())
  }
}

const manager = new ConnectionManager({ /* some params */ });

manager.getConnection()
  .pipe(
    mergeMap(connection => concat(
      connection.execute('SELECT 1'),
      connection.close() // explicitly close connection
    )),
    // alternatively
    // take(1) // to close connection automatically
  );

例如,您可以做的很酷的事情是在失败的情况下轻松重试连接:

oracle.getConnection()
  .pipe(
    retry(3)
    ...
  );