我如何使用 RxJS observables 实现数据库 connection/select/close
How do I implement a database connection/select/close using RxJS observables
我正在使用 node-oracledb 连接到 Oracle 数据库。 API 提供了自己的承诺,可以转换为 Promise<T>
,因此 "converted" 转换为 Observable<T>
。
使用 Observables,我想:
- 打开数据库连接
- Select N条记录
- 关闭数据库连接,即使#2 抛出异常。
使用传统的、阻塞的、程序化的方式,它会是这样的:
try
{
connection = Oracle.getConnection(...);
resultSet = connection.execute("SELECT ... FROM ...");
}
catch (Exception)
{
resultSet = EMPTY_RESULT;
}
finally
{
if (connection)
connection.close();
}
我尝试使用 Observables 编写此代码导致大量代码和回调。
受保护的方法 getConnection() 仍然非常简单:
import * as Oracle from "oracledb";
protected getConnection() : Observable<IConnection>
{
return OraUtil.from(Oracle.getConnection(this.parameters));
}
closeConnection()
方法也是如此。我在这里直接使用了 promise,以避免更多代码。
protected closeConnection(subscriber : Subscriber<IExecuteReturn>, connection : IConnection) : void
{
connection.close()
.then(() => subscriber.complete())
.catch((error) => subscriber.error());
}
但是 execute()
方法是麻烦的开始。
protected _execute(connection : IConnection, statement : string) : Observable<IExecuteReturn>
{
return new Observable<IExecuteReturn>(
(subscriber) => {
OraUtil.from(connection.execute(statement)).subscribe(
(result) => subscriber.next(result),
(error) => {
subscriber.error(error);
this.closeConnection(subscriber, connection);
},
() => {
this.closeConnection(subscriber, connection);
});
});
}
public execute(statement : string) : Observable<IExecuteReturn>
{
return this.getConnection().pipe(
flatMap((connection) => this._execute(connection, statement))
);
}
一般来说,当使用 RxJs 6.x 时,类似 "connect + execute + close" 序列的实现是沿着这些线
let connection;
Oracle.getConnection(....)
.pipe(
switchMap(conn => {
connection = conn;
return connection.execute(statement);
}),
finalize(() => connection.close())
)
.subscribe(
result => resultSet = result,
error => {
console.error(error);
}
)
语法细节可能不同,但关键思想是一旦连接 Observable 发出,然后你 switchMap
执行语句。当您订阅时,整个链就会被激活。在订阅中,您可以在出现错误或执行返回的 Observable 完成时关闭连接。
这就是我通常处理连接管理的方式。核心是 using
observable creator,它接受资源工厂作为第一个参数,设置函数作为第二个参数。
using(() => { unsubscribe() }, resource => observableOf(resource))
resource
是一个具有 unsubscribe
方法的对象,作为取消订阅的一部分被调用 - 因此您可以在那里隐藏任何逻辑,并有效地将任意对象的生命周期绑定到可观察对象的生命周期。
我希望下面的代码有意义。
import * as Oracle from "oracledb";
import { mergeMap , ignoreElements} from 'rxjs/operators';
import { using } from 'rxjs/observable/using';
import { from as observableFrom } from 'rxjs/observable/from';
import { concat } from 'rxjs/observable/concat';
import { defer } from 'rxjs/observable/defer';
import { empty as observableEmpty } from 'rxjs/observable/empty';
class OracleConnection {
constructor(parameters) {
this.isClosed = false;
this.connection = null;
this.parameters = parameters;
}
setup() {
return defer(() => Oracle.getConnection(this.parameters)
.then(connection => { // do this in promise in case observable gets unsubscribed before connection is established
this.connection = connection;
if (this.isClosed) { // close in case connection got already closed before even got established
this.terminate();
}
return connection;
}));
}
close() {
this.isClosed = true;
if (this.connection !== null) {
const connection = this.connection;
this.connection = null;
return observableFrom(connection.close())
.pipe(ignoreElements()) // only propagate errors
}
return observableEmpty(); // connection already closed
}
terminate() {
this.close().subscribe(/* handle error from connection close */);
}
unsubscribe() { // this will get called on observable unsubscribe
if (!this.isClosed) {
this.terminate();
}
}
}
class ConnectionManager {
constructor(params) {
this.params = params;
}
getConnection() {
return using(() => new OracleConnection(this.params), oracleConnection => oracleConnection.setup())
}
}
const manager = new ConnectionManager({ /* some params */ });
manager.getConnection()
.pipe(
mergeMap(connection => concat(
connection.execute('SELECT 1'),
connection.close() // explicitly close connection
)),
// alternatively
// take(1) // to close connection automatically
);
例如,您可以做的很酷的事情是在失败的情况下轻松重试连接:
oracle.getConnection()
.pipe(
retry(3)
...
);
我正在使用 node-oracledb 连接到 Oracle 数据库。 API 提供了自己的承诺,可以转换为 Promise<T>
,因此 "converted" 转换为 Observable<T>
。
使用 Observables,我想:
- 打开数据库连接
- Select N条记录
- 关闭数据库连接,即使#2 抛出异常。
使用传统的、阻塞的、程序化的方式,它会是这样的:
try
{
connection = Oracle.getConnection(...);
resultSet = connection.execute("SELECT ... FROM ...");
}
catch (Exception)
{
resultSet = EMPTY_RESULT;
}
finally
{
if (connection)
connection.close();
}
我尝试使用 Observables 编写此代码导致大量代码和回调。
受保护的方法 getConnection() 仍然非常简单:
import * as Oracle from "oracledb";
protected getConnection() : Observable<IConnection>
{
return OraUtil.from(Oracle.getConnection(this.parameters));
}
closeConnection()
方法也是如此。我在这里直接使用了 promise,以避免更多代码。
protected closeConnection(subscriber : Subscriber<IExecuteReturn>, connection : IConnection) : void
{
connection.close()
.then(() => subscriber.complete())
.catch((error) => subscriber.error());
}
但是 execute()
方法是麻烦的开始。
protected _execute(connection : IConnection, statement : string) : Observable<IExecuteReturn>
{
return new Observable<IExecuteReturn>(
(subscriber) => {
OraUtil.from(connection.execute(statement)).subscribe(
(result) => subscriber.next(result),
(error) => {
subscriber.error(error);
this.closeConnection(subscriber, connection);
},
() => {
this.closeConnection(subscriber, connection);
});
});
}
public execute(statement : string) : Observable<IExecuteReturn>
{
return this.getConnection().pipe(
flatMap((connection) => this._execute(connection, statement))
);
}
一般来说,当使用 RxJs 6.x 时,类似 "connect + execute + close" 序列的实现是沿着这些线
let connection;
Oracle.getConnection(....)
.pipe(
switchMap(conn => {
connection = conn;
return connection.execute(statement);
}),
finalize(() => connection.close())
)
.subscribe(
result => resultSet = result,
error => {
console.error(error);
}
)
语法细节可能不同,但关键思想是一旦连接 Observable 发出,然后你 switchMap
执行语句。当您订阅时,整个链就会被激活。在订阅中,您可以在出现错误或执行返回的 Observable 完成时关闭连接。
这就是我通常处理连接管理的方式。核心是 using
observable creator,它接受资源工厂作为第一个参数,设置函数作为第二个参数。
using(() => { unsubscribe() }, resource => observableOf(resource))
resource
是一个具有 unsubscribe
方法的对象,作为取消订阅的一部分被调用 - 因此您可以在那里隐藏任何逻辑,并有效地将任意对象的生命周期绑定到可观察对象的生命周期。
我希望下面的代码有意义。
import * as Oracle from "oracledb";
import { mergeMap , ignoreElements} from 'rxjs/operators';
import { using } from 'rxjs/observable/using';
import { from as observableFrom } from 'rxjs/observable/from';
import { concat } from 'rxjs/observable/concat';
import { defer } from 'rxjs/observable/defer';
import { empty as observableEmpty } from 'rxjs/observable/empty';
class OracleConnection {
constructor(parameters) {
this.isClosed = false;
this.connection = null;
this.parameters = parameters;
}
setup() {
return defer(() => Oracle.getConnection(this.parameters)
.then(connection => { // do this in promise in case observable gets unsubscribed before connection is established
this.connection = connection;
if (this.isClosed) { // close in case connection got already closed before even got established
this.terminate();
}
return connection;
}));
}
close() {
this.isClosed = true;
if (this.connection !== null) {
const connection = this.connection;
this.connection = null;
return observableFrom(connection.close())
.pipe(ignoreElements()) // only propagate errors
}
return observableEmpty(); // connection already closed
}
terminate() {
this.close().subscribe(/* handle error from connection close */);
}
unsubscribe() { // this will get called on observable unsubscribe
if (!this.isClosed) {
this.terminate();
}
}
}
class ConnectionManager {
constructor(params) {
this.params = params;
}
getConnection() {
return using(() => new OracleConnection(this.params), oracleConnection => oracleConnection.setup())
}
}
const manager = new ConnectionManager({ /* some params */ });
manager.getConnection()
.pipe(
mergeMap(connection => concat(
connection.execute('SELECT 1'),
connection.close() // explicitly close connection
)),
// alternatively
// take(1) // to close connection automatically
);
例如,您可以做的很酷的事情是在失败的情况下轻松重试连接:
oracle.getConnection()
.pipe(
retry(3)
...
);