如何使用 rxjs observables 代替 async/await 重写此代码?

How to rewrite this code using rxjs observables instead async/await?

我需要 createUser 函数 returns Observable<UserEntity> 但在这个函数中我还必须对数据库进行 2 次查询并检查该用户是否存在。下面的代码使用 async/await 并且看起来非常干净。但问题是我在这个项目中到处都使用了 rxjs,并且想以某种方式使用 rxjs 来编写它。它可以像现在一样干净但有 Observables 吗?

async create(user: CreateUserDTO): Promise<UserEntity> {
    const userByEmail = await this.getUserByEmail(); 

    const userByLogin = await this.getUserByLogin(); 
    if (userByLogin || userByEmail)
        // retrun error here

    return await this.createUser(user);
}

假设 this.getUserByEmail()this.getUserByLogin()this.createUser(user) return Promises,代码可能如下所示

create(user: CreateUserDTO): Observable<UserEntity> {
    // with the rxjs from function we turn a Promise into an Observable
    const userByEmail$ = from(this.getUserByEmail()); 
    const userByLogin$ = from(this.getUserByLogin()); 

    // with forkjoin we create an Observable which notifies when all the 
    // Observables which have been passed in as parameters notify
    return forkJoin([userByEmail$, userByLogin$]).pipe(
       // with concatMap you wait for the upstream Observable (i.e. the 
       // Observable created by forkJoin) to notify and complete, and then
       // you return the next Observable in the chain, which is, in this case,
       // the Observable which (when subscribed) creates the user
       concatMap(([userByLogin, userByEmail]) => 
          if (userByLogin || userByEmail) {
              // throw error here
          }
          return from(this.createUser(user))
       })
    )
}

否则,如果this.getUserByEmail()this.getUserByLogin()this.createUser(user) return Observables你不需要使用from rxjs函数并且代码会稍微简单一些,像这样

create(user: CreateUserDTO): Observable<UserEntity> {
    return forkJoin([this.getUserByEmail(), this.getUserByLogin()]).pipe(
       concatMap(([userByLogin, userByEmail]) => 
          if (userByLogin || userByEmail) {
              // throw error here
          }
          return from(this.createUser(user))
       })
    )
}

我正在使用 RxJs 6.5

forkJoin 将在两个异步函数 getUserByEmailgetUserByLogin 完成执行时发出结果

If getUserByEmail & getUserByLogin returns Promise,为此使用 from 转换为 promise 变成 observable

mergeMap 订阅内部 observable.In 我们的案例 createUser returns observable

create(user: CreateUserDTO): Observable < UserEntity > {

    //If getUserByEmail & getUserByLogin returs Promise

    const getUserByEmail$ = from(this.getUserByEmail());
    const getUserByLogin$ = from(this.getUserByLogin());

    //If Both returns Observable
    //const getUserByEmail$ = this.getUserByEmail();
    //const getUserByLogin$ = this.getUserByLogin();


    return forkJoin({
        userByEmail: this.getUserByEmail(),
        userByLogin: this.getUserByLogin(),
    }).pipe(
        tap((res) => {
            if (res.userByEmail || res.userByLogin) {
                throw 'User exists!';
            }
        }),
        mergeMap(() => {
            return from(this.createUser(user));
            //If createUser returns Observable,then
            //return this.createUser(user);
        })
    );
}