如何仅在触发多个其他事件后才使用 RxJS 和 Observables 执行 http 请求?

How do I use RxJS & Observables to do an http request only after multiple other events have fired?

我无法将两个不同 Observable 的发出值组合起来用于 HTTP 请求,然后返回请求的 Observable 供客户端使用。

一些背景:我正在开发 Twitch 扩展程序。他们的生态系统的一部分是扩展通过事件回调接收环境信息。我感兴趣的位于window.Twitch.ext.onAuthorized()window.Twitch.ext.configuration.onChanged()(如果有兴趣,请看这里了解更多详情:https://dev.twitch.tv/docs/extensions/reference#helper-extensions)。

调用我的后端时,我需要上述两个事件的信息。这些不会经常更改(如果有的话),但在它们都可用之前我无法拨打电话,并且我想在拨打电话时获得最近提供的价值。看起来 BehaviorSubjects 很适合这个:

export class TwitchService {
  private authSubject: BehaviorSubject<TwitchAuth> = new BehaviorSubject<TwitchAuth>(null);
  private configSubject: BehaviorSubject<TwitchConfig> = new BehaviorSubject<TwitchConfig>(null);
  private helper: any;

  constructor(private window: Window) {
    this.helper = this.window['Twitch'].ext;
    this.helper.onAuthorized((auth: TwitchAuth) => {
      this.authSubject.next(auth);
    });
    this.helper.configuration.onChanged(() => {
      this.configSubject.next(JSON.parse(this.helper.configuration.global.content));
    });
  }

  onAuthorized(): Observable<TwitchAuth> {
    return this.authSubject.asObservable().pipe(filter((auth) => !!auth));
  }

  onConfig(): Observable<TwitchConfig> {
    return this.configSubject.asObservable().pipe(filter((config) => !!config));
  }
}

此模型适用于订阅这两个 Observable 之一的应用程序部分。我的问题是我找不到一种有效的方法来组合它们并使用两者最新发出的值来为 http 请求创建一次性 Observable

这是我目前的情况:

type twitchStateToObservable<T> = (auth: TwitchAuth, config: TwitchConfig) => Observable<T>;

export class BackendService {
  constructor(private http: HttpClient, private twitch: TwitchService) {}

  private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
    let subject = new Subject<T>();
    combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
      first(), // because we only want to make the http request one time
      map(([auth, config]) => f(auth, config).subscribe((resp) => subject.next(resp)))
    );
    return subject.asObservable();
  }

  exampleHttpRequest(): Observable<Response> {
    return this.httpWithTwitchState((auth, config) =>
      // this.url() and this.headers() are private functions of this service
      this.http.get<Response>(this.url(config) + 'exampleHttpRequest', { headers: this.headers(auth, config)})
    );
  }
}

然后,此服务的客户端应该能够通过这个简单的调用发出 http 请求,而无需了解有关事件的任何信息或关心它们何时触发:

this.backend.exampleHttpRequest().subscribe((resp) => {
  // do stuff with resp here...but this is never hit
}

根据我的理解,只要任一输入 Observable 发出新值,combineLatest() 就应该发出新值。但是,在我的应用程序中从未触发 map() 内部的调用 f(auth, config) 。我已经使用断点、console.logs 并通过关注浏览器调试器工具中的“网络”选项卡对其进行了测试。对 first() 的调用可能会取消它,但出于显而易见的原因,如果事件再次触发,我不想重复 http 请求。

在此先感谢您的任何建议或指点!

你可以把 c​​oncat 想象成 ATM 上的一条线,下一个交易(订阅)要等到上一个交易完成后才能开始!

示例:https://stackblitz.com/edit/typescript-3qsfgk?file=index.ts&devtoolsheight=100

样本:

const mergeValue = concat(authSubject,configSubject) 

mergeValue.subscribe( (x) => {
  // do stuff with resp here
});

好的,原来我遗漏了一些明显的东西。我没有订阅 combineLatest() 调用,所以它从未被执行过。在末尾添加 subscribe() 后,效果很好:

private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
  let subject = new Subject<T>();
  combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
      first(),
      map(([auth, config]) => f(auth, config).subscribe((resp) => subject.next(resp)))
  ).subscribe(); // adding this fixed the problem
  return subject.asObservable();
} 

这样做之后,我对如何在这种情况下 un-nest 订阅进行了更多研究,这使我找到了 flatMap() 运算符。所以现在 httpWithTwitchState() 函数看起来像这样:

private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
  return combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
    first(),
    map(([auth, config]) => f(auth, config)),
    flatMap((resp) => resp)
  );
}

这现在按预期工作并且更干净。感谢那些花时间阅读我的问题的人 and/or 回复。