RxJs switchMap 与 angular HttpClient

RxJs switchMap with angular HttpClient

我有一个用例,每当触发新请求时,任何已经在运行中的 http 请求都应该被取消/忽略。

例如

  • A request (say #2) comes in while the request #1 takes too long to respond / slow network connectivity.
  • In this case #2 gets a very quick response from server then at any point even if the #1 comes back with a response the HTTP response observable should be ignored.
  • The problem i face is, first the component displays response values from request #2 and gets updated again when req #1 completes (this should not happen).

我认为 switchMap 取消了 obervables / 维护了 observable 值发出的顺序。

摘自我的 service.ts

Obervable.of('myServiceUrl')
             .switchMap(url => this.httpRequest(url) )
              .subscribe( response => {
                   // implementation
                   /** Update an observable with the 
                       with latest changes from response. this will be 
                       displayed in a component as async */
                });


private httpRequest(url) {
        return this.httpClient.get('myUrl', { observe: 'response' });
}

以上实现无效。有人能找出这个用例的正确实现吗?

您似乎正在创建多个可观察对象。从您的示例中不清楚,但似乎您每次要发出请求时都会调用 Observable.of 。这每次都会创建一个 new Observable 流,因此对于每个后续调用,您都会获得一个新流,并且不会取消前一个流。这就是 .switchMap 不起作用的原因。

如果您希望.switchMap 取消 HTTP 请求,您需要它们使用 相同的 可观察流。您要使用的源 Observable 取决于触发 http 请求的确切原因,但您可以使用 Subject.

之类的内容自行管理它
const makeRequest$ = new Subject();
const myResponse$ = makeRequest$.pipe(switchMap(() => this.service.getStuff()));

您可以订阅myResponse$以获取回复。任何时候你想触发一个请求,你可以做 makeRequest$.next().

我有下面的代码摘录,其中 switchMap 实现成功。

class MyClass {
    private domain: string;
    private myServiceUri: subject;
    myData$: Observable<any>;

        constructor(private http: HttpClient) {
            .....
            this.myServiceUri = new Subject();
            this.myServiceUri.switchMap(uri => {
                    return this.http.get(uri , { observe: 'response' })
                            // we have to catch the error else the observable sequence will complete
                            .catch(error => {
                                // handle error scenario
                                return Obervable.empty(); //need this to continue the stream
                            });
                    })
                    .subscribe(response => {
                        this.myData$.next(response);
                    });
        }

        getData(uri: string) {
            this.myServiceUri.next(this.domain + uri); // this will trigger the request     
        }

    }