Rxjs 在顺序执行操作时出现 concat 运算符问题

Rxjs issue with the concat operator when executing operations sequentially

我正在使用 rxjs 6,我正在执行两个顺序很重要的异步操作。

我确实有这段代码可以完美运行:

dbmsProxy.createDatastores().subscribe(() => {
    UsersDAO.insert(users).subscribe(() => {
        console.log('FINISHED ALL THE CHAIN');
    });
});

但是当我尝试使用 rxjsconcat 时,我确实遇到了问题,因为第二个在第一个完成之前执行:

concat([dbmsProxy.createDatastores(), UsersDAO.insert(users)]).subscribe();

DBMSProxy 方法下方

public createDatastores(): Observable<string> {
    const _this: DBMSProxy = this;
    const subject = new Subject<string>();
    const subscription: Subscription = UsersDAO.createDatastore().subscribe(
        onSuccess,
        onError,
        onFinally
    );
    return subject;

    function onSuccess(datastore: Nedb): void {
        console.log(`USERS Datastore Created Successfully`);
        _this.db.users = datastore;
        subject.next('success');
    }

    function onError(err: string) {
        subject.error('error');
        console.error(err);
    }

    function onFinally() {
        subject.complete();
        subscription.unsubscribe();
    }
}

public insertDocuments(documents: any, datastore: Nedb): Subject<any> {
    const subject = new Subject<any>();
    datastore.insert(documents, onInsert);
    return subject;

    function onInsert(err: Error, newDocuments: any) {
        if (err) {
            subject.error(err);
        } else {
            // add to the documents to insert the id just created from nedb when inserting the document
            documents.forEach((document: any, ind: number) => {
                document.id = newDocuments[ind]._id;
            });
            subject.next(documents);
        }
        subject.complete();
    }
}

在 UsersDAO 方法下面:

public static createDatastore(): Subject<Nedb | string> {
        const subject = new Subject<Nedb | string>();
        const datastore = new Nedb({
            filename: USERS_DATASTORE_FULL_NAME,
            autoload: true,
            onload
        });
        return subject;

        function onload(err: Error) {
            if (err) {
                subject.error(
                    `Error creating USERS datastore: ${err.name} - ${err.message}`
                );
            } else {
                subject.next(datastore);
            }
            subject.complete();
        }
    }

    public static insert(users: User[]): Observable<any> {
        return DBMSProxy.getInstance()
            .insertDocuments(users, DBMSProxy.getInstance().db.users)
            .pipe(catchError((val: any) => of('Error inserting the users')));
    }

请问您知道发生了什么事吗?

我目前的解决方案是将 Subject 转换为 Observable,用第二个创建一个新的 Observable,然后删除方括号(否则我将取回 observable 而不是结果) 这似乎有效:

const operations = concat(
    dbmsProxy.createDatastores().asObservable(),
    defer(() => UsersDAO.insert(users))
);
operations.subscribe(onSubscribe);
function onSubscribe(result: any) {
    console.log('Finished all: ', result);
}