确保在发出下一个值之前发生订阅的最佳方法

Best way to ensure subscription happened before emitting next value

我正在使用 OAuth 2.0 为我的 Angular 应用程序中的用户检索令牌。我使用流行的库 angular-oauth2-oidc 来处理 OAuth 流程。因此,在我的 AppComponent 中,我从 angular-oauth2-oidc 设置了 oAuthService,然后通过调用 initCodeFlow() 在我的登录组件中启动 oAuth 流程。用户被重定向以输入他的凭据,然后他被重定向到我,我在 AppComponent 中收到令牌并将其作为下一个令牌放入我的 DataService TokenSubject。

但现在我想在我的 MainPageComponent 的 header 中使用该标记进行 API 调用。因此,我在我的 MainPage 中订阅了 TokenSubject,当我收到令牌时,订阅块中的代码将被执行。

但是如果令牌发送给我的速度比主页建立的速度快怎么办?然后,当 AppComponent 发出下一个 Token 值时,我还没有订阅主页中的 TokenSubject。我不确定,但我的主页有时打不开,我想这可能是原因。

在发出 TokenSubject 之前确保主页已订阅 TokenSubject 的最佳方法是什么?

我的AppComponent:

import { OAuthService } from 'angular-oauth2-oidc';
...

export class AppComponent implements OnInit {
    token: Token = {
      access_token: null,
      refresh_token: null
};

constructor(private oauthService: OAuthService,
                private adapterService: AdapterService,
                private dataService: DataService,
                private logger: NGXLogger) {
}

ngOnInit() {
        this.configureOAuthService();
        this.subscribeToOAuthEvents();
    }

configureOAuthService() {
        const authConfig: AuthConfig = {
            issuer: environment.oauthLoginUrl,
            redirectUri: environment.oauthRedirectUrl,
            clientId: environment.apiKey,
            scope: 'openid',
            responseType: 'code',
            showDebugInformation: true,
            disablePKCE: false,
            oidc: true,
            disableAtHashCheck: false,
        };

        this.oauthService.configure(authConfig);
        this.oauthService.tokenValidationHandler = new JwksValidationHandler();
        this.oauthService.loadDiscoveryDocumentAndTryLogin();
}

subscribeToOAuthEvents() {
        this.oauthService.events.subscribe(e => {
            if (e.type === 'token_received' || e.type === 'token_refreshed' ) {
                this.storeTokenInDataService();

                const self = this;
                // this will wait for 2 hours minus 30 sec before it refreshes the access token;
                setTimeout(() => { self.refreshToken(self); }, 7_170_000);
            } else if (e.type === 'token_expires') {
                this.logger.log('token_expires, going to refresh');
                this.refreshToken(this);
            } else if (e instanceof OAuthErrorEvent) {
                this.logger.error('OAuth error:', e);
            } else {
                this.logger.log('OAuthEvent:', e);
            }
        });
    }

    storeTokenInDataService() {
        this.token.access_token = this.oauthService.getAccessToken();
        this.token.refresh_token = this.oauthService.getRefreshToken();
        this.dataService.nextToken(this.token);
    }

在我的主页中,我订阅了 sharedToken,但我不确定这是否总是在令牌发出之前发生:

ngOnInit(): void {
  console.log('going to get the token from dataService');
  this.dataService.sharedToken
    .pipe(
      takeUntil(this.destroy$),
      tap(token => {
        this.token = token;
        console.log('got Token');
      }),
      switchMap(token => {
        this.headers = this.createHeaders(token.access_token);

        return this.adapterService.getUserInfo(this.headers);
      }))
    .subscribe(UserInfo => {
      this.logger.log('received userInfo : ', userInfo);
      ... 
  });
 }
}

在我的数据服务中它看起来像这样:

  private token = new Subject<Token>();
  sharedPToken = this.token ;

  nextToken(token: Token) {
    this.token.next(token);
  }

更新:问题解决了,我最终的主要组件是这样的:

ngOnInit(): void {
  this.dataService.sharedTokenReplaySubject
    .pipe(
      takeLast(1),
      tap(token => {
        this.token = token;
        console.log('got Token');
      }),
      switchMap(token => {
        this.headers = this.createHeaders(token.access_token);

        return this.adapterService.getUserInfo(this.headers);
      }))
    .subscribe(UserInfo => {
      this.logger.log('received userInfo : ', userInfo);
      ... 
  });
 }
}

简而言之:您不能用 Subject 来确保它。

您可以改用 RxJS ReplaySubject。它是您正在使用的更通用 Subject 的变体。然而,它可以“hold/buffer”特定数量的先前发射,并在订阅后立即发射给未来的订阅者。在您的情况下,您可以使用缓冲区 1。它将保留并发出最后推送给它的值。

private token = new ReplaySubject<Token>(1);  // <-- buffer 1
sharedToken = this.token.asObservable();

nextToken(token: Token) {
  this.token.next(token);
}

这样您就不必担心在订阅 sharedToken 之前或之后将值推送到 token。即使值在订阅之前被推送到 token,订阅 this.dataService.sharedToken 也会获得令牌。