依赖嵌套的可观察对象

Dependent nested observables

我有两种方法,其中我正在执行一些异步操作,但我不希望方法 2 执行,直到或除非方法 1 的响应是肯定的,以表明我们可以进一步进行。

所以,这是我尝试过的:

方法一:

private method1(): Observable<any> {
  return new Observable(() => {
    executingSomething();
    this.anotherSubscription.pipe(
      map(x => {
        console.log('Control is not reaching here');
        Also, how to indicate the caller that method2 can be executed?
        I can't return anything since I'm already returning the new Observable, above.
      })
    );
  });
}

来电者:

concat(this.method1(), this.method2()).subscribe();

问题:anotherSubscription 甚至没有被执行,我想不出任何方法将 anotherSubscription 中的响应传递给调用者。

我觉得我没有在正确的意义上使用 observables,但似乎无法在任何地方找到任何东西。

您可以显式return将serviceN的值设为serviceN+1。想法是这样的:

private setupStuff() {
  this.initRouteParams()
    .pipe(
      switchMap(serviceId => {
        return zip(of(serviceId), this.getFileInfo(serviceId))
      }),
      switchMap(([serviceId, filename]) => {
        return zip(of(serviceId), of(filename), this.getExistingFile(serviceId, filename))
      })
    )
    .subscribe(([serviceId, filename, response]) => {
      console.log(serviceId, filename, response);
    })
}

"I don't want method 2 to execute until or unless the response from method 1 is positive"

如果您只想从 obs2 发射,在 obs1 发射后触发(无论 obs1 发射什么),只需使用 switchMap:

fromEvent(document, 'click')
  .pipe(
    switchMap(() => of([1]))
  )
  .subscribe(console.log); <--- will emit [1] only after the user clicks.

如果您需要 obs2 仅在 obs1 发出特定值后发出,请将 switchMap 与过滤器运算符一起使用 (https://www.learnrxjs.io/learn-rxjs/operators/filtering/filter):

const obs$ = of(true); 
const obs2$ = of([1,2,3]);

    obs$.pipe(
        filter((result) => result === true),
        switchMap(() => obs2$)
      ).subscribe(console.log) <--- returns [1,2,3] since filter condition returns true

如果您需要类似 If Else 语句的内容,请使用条件运算符 IIF (https://www.learnrxjs.io/learn-rxjs/operators/conditional/iif):

const even$ = of('even');
const odd$ = of('odd');

interval(1000).pipe(
  mergeMap(v =>
    iif(
      () => v % 2 === 0,
      even$,
      odd$
    ))
).subscribe(console.log); <--- will emit every second a string "even" or "odd" based on the condition.

在编写解决方案之前,我对您的代码做了一些假设。如果假设是错误的,那么答案也可能是错误的。

  1. executingSomething() 是一个具有不相关功能的 同步 方法,除了你希望它抛出时整个事情都失败。
  2. anotherSubscription 不是 Subscribtion 而是 Observable(它们是不同的东西)。

以下是我如何解决您的问题:

class SomeClass {
  private method1(): Observable<any> {
    try {
      executingSomething();
    } catch (err) {
      // This would return an errorred observable, which is great, since
      // you can still subscribe to it (no need to change the return type
      // of this method).
      return throwError(err);
    }

    return this.anotherSubscription.pipe(
      tap(() => {
        console.log('Controls is reaching here!');
      }),
    );
  }

  private method2(): Observable<any> {
    // This can be whatever obervable, using of(null) for demo purposes.
    return of(null);
  }

  private parentMethod() {
    this.method1()
      .pipe(
        switchMap(valueFromAnotherSubscription => {
          // UPD1 - Implement your custom check here. This check will determine
          // whether `method2` will be called.
          if (valueFromAnotherSubscription === targetValue) {
            return this.method2();
          } else {
            // UPD1 - If the check evaluates to `false`, reemit the same value
            // using the `of` operator.
            return of(valueFromAnotherSubscription);
          }
        }),
      )
      .subscribe(() => {
        console.log('Done!');
      });
  }
}

正如 Kevin 所正确指出的那样,这里的关键运算符是 switchMap。每次 observable (anotherSubscription) 发出时,switchMap 将取消它并订阅不同的 observable(无论从 method2 返回什么)。

这是按顺序发生的,所以 method2 只会在 method1 发出后被订阅。如果 method1 抛出,整个管道将失败。您也可以 filter method1 的结果来决定是否要切换到 method2

此外,在大多数情况下可能不需要使用 new Observable 构造可观察对象。我有一个相当大的 RxJS 应用程序,到目前为止从未使用过它。

更新 1

查看带有 UPD1 注释的代码。

请记住,如果 anotherSubscription 抛出错误,switchMap 函数 将不会被调用 无论如何。