Rxjs 在顺序执行操作时出现 concat 运算符问题
Rxjs issue with the concat operator when executing operations sequentially
我正在使用 rxjs 6,我正在执行两个顺序很重要的异步操作。
我确实有这段代码可以完美运行:
dbmsProxy.createDatastores().subscribe(() => {
UsersDAO.insert(users).subscribe(() => {
console.log('FINISHED ALL THE CHAIN');
});
});
但是当我尝试使用 rxjs
的 concat
时,我确实遇到了问题,因为第二个在第一个完成之前执行:
concat([dbmsProxy.createDatastores(), UsersDAO.insert(users)]).subscribe();
DBMSProxy 方法下方
public createDatastores(): Observable<string> {
const _this: DBMSProxy = this;
const subject = new Subject<string>();
const subscription: Subscription = UsersDAO.createDatastore().subscribe(
onSuccess,
onError,
onFinally
);
return subject;
function onSuccess(datastore: Nedb): void {
console.log(`USERS Datastore Created Successfully`);
_this.db.users = datastore;
subject.next('success');
}
function onError(err: string) {
subject.error('error');
console.error(err);
}
function onFinally() {
subject.complete();
subscription.unsubscribe();
}
}
public insertDocuments(documents: any, datastore: Nedb): Subject<any> {
const subject = new Subject<any>();
datastore.insert(documents, onInsert);
return subject;
function onInsert(err: Error, newDocuments: any) {
if (err) {
subject.error(err);
} else {
// add to the documents to insert the id just created from nedb when inserting the document
documents.forEach((document: any, ind: number) => {
document.id = newDocuments[ind]._id;
});
subject.next(documents);
}
subject.complete();
}
}
在 UsersDAO 方法下面:
public static createDatastore(): Subject<Nedb | string> {
const subject = new Subject<Nedb | string>();
const datastore = new Nedb({
filename: USERS_DATASTORE_FULL_NAME,
autoload: true,
onload
});
return subject;
function onload(err: Error) {
if (err) {
subject.error(
`Error creating USERS datastore: ${err.name} - ${err.message}`
);
} else {
subject.next(datastore);
}
subject.complete();
}
}
public static insert(users: User[]): Observable<any> {
return DBMSProxy.getInstance()
.insertDocuments(users, DBMSProxy.getInstance().db.users)
.pipe(catchError((val: any) => of('Error inserting the users')));
}
请问您知道发生了什么事吗?
我目前的解决方案是将 Subject
转换为 Observable
,用第二个创建一个新的 Observable,然后删除方括号(否则我将取回 observable 而不是结果) 这似乎有效:
const operations = concat(
dbmsProxy.createDatastores().asObservable(),
defer(() => UsersDAO.insert(users))
);
operations.subscribe(onSubscribe);
function onSubscribe(result: any) {
console.log('Finished all: ', result);
}
我正在使用 rxjs 6,我正在执行两个顺序很重要的异步操作。
我确实有这段代码可以完美运行:
dbmsProxy.createDatastores().subscribe(() => {
UsersDAO.insert(users).subscribe(() => {
console.log('FINISHED ALL THE CHAIN');
});
});
但是当我尝试使用 rxjs
的 concat
时,我确实遇到了问题,因为第二个在第一个完成之前执行:
concat([dbmsProxy.createDatastores(), UsersDAO.insert(users)]).subscribe();
DBMSProxy 方法下方
public createDatastores(): Observable<string> {
const _this: DBMSProxy = this;
const subject = new Subject<string>();
const subscription: Subscription = UsersDAO.createDatastore().subscribe(
onSuccess,
onError,
onFinally
);
return subject;
function onSuccess(datastore: Nedb): void {
console.log(`USERS Datastore Created Successfully`);
_this.db.users = datastore;
subject.next('success');
}
function onError(err: string) {
subject.error('error');
console.error(err);
}
function onFinally() {
subject.complete();
subscription.unsubscribe();
}
}
public insertDocuments(documents: any, datastore: Nedb): Subject<any> {
const subject = new Subject<any>();
datastore.insert(documents, onInsert);
return subject;
function onInsert(err: Error, newDocuments: any) {
if (err) {
subject.error(err);
} else {
// add to the documents to insert the id just created from nedb when inserting the document
documents.forEach((document: any, ind: number) => {
document.id = newDocuments[ind]._id;
});
subject.next(documents);
}
subject.complete();
}
}
在 UsersDAO 方法下面:
public static createDatastore(): Subject<Nedb | string> {
const subject = new Subject<Nedb | string>();
const datastore = new Nedb({
filename: USERS_DATASTORE_FULL_NAME,
autoload: true,
onload
});
return subject;
function onload(err: Error) {
if (err) {
subject.error(
`Error creating USERS datastore: ${err.name} - ${err.message}`
);
} else {
subject.next(datastore);
}
subject.complete();
}
}
public static insert(users: User[]): Observable<any> {
return DBMSProxy.getInstance()
.insertDocuments(users, DBMSProxy.getInstance().db.users)
.pipe(catchError((val: any) => of('Error inserting the users')));
}
请问您知道发生了什么事吗?
我目前的解决方案是将 Subject
转换为 Observable
,用第二个创建一个新的 Observable,然后删除方括号(否则我将取回 observable 而不是结果) 这似乎有效:
const operations = concat(
dbmsProxy.createDatastores().asObservable(),
defer(() => UsersDAO.insert(users))
);
operations.subscribe(onSubscribe);
function onSubscribe(result: any) {
console.log('Finished all: ', result);
}