如何在 RxJS 6 和 Angular 6 出错后保持 observable 存活

How to keep observable alive after error in RxJS 6 and Angular 6

任何人都可以帮助解决 this._getReactions$.next()this.http.get(...) 出现错误时无法正常工作的情况。我想让 observable 保持活动状态以获取下一个输入。

private _getReactions$: Subject<any> = new Subject();

 constructor() {
  this._getReactions$
  .pipe(
    switchMap(() => {
        return this.http.get(...)
        // http request 
    }),
    catchError(error => {
      console.log(error);
      return empty();
    })
  )
  .subscribe(data => {
      console.log(data)
      //results handling
  });
 }

onClick() {
  this._getReactions$.next();
}

如果 observable 死亡,它称之为错误处理程序并且它们已关闭,您不能通过它们发送任何东西,这意味着它们已关闭,包括间隔在内的所有上游都已死亡。

what if we want to live.

屏蔽主观察者链是解决方案
将 catch 放入 switchmap 每当请求被触发时 switchmap 创建 ajax 可观察对象,这次是 catch.
switchmap 有一个行为,它说我的消息来源 还没有完成所以我真的不在乎 child 完成我会继续。

 constructor() {
  this._getReactions$
    .pipe(tap(value => { this.loading = true; return value }),
      switchMap(() => {
        return this.http.get(...).pipe(
          catchError((error) => this.handleError(error)))
        // http request
      }),
    )
    .subscribe(data => {
      console.log(data)
      //results handling
      this.error = false;
      this.loading = false
    });
}

private handleError(error: HttpErrorResponse) {

  this.error = true;
  console.log(error)
  this.loading = false
  return empty();

Live Demo

Detailed Info

PS:嵌套在任何 flattening 运算符中,例如 mergeMapconcatMapexhaustMap 和其他展平运算符也可以。

我已经为所有请求

解决了这个问题

创建一个将执行所有请求的加载程序文件

loader.ts

import { Observable, Subject, Subscription, EMPTY } from 'rxjs';
import { catchError, map, switchMap } from 'rxjs/operators';

export class Loader<T1, T> {
  private _requestQueue: Subject<T1>;
  private _errorQueue: Subject<Error>;
  private _resultQueue: Observable<T>;
  private _loaded = false;

  constructor(loaderFunction: (T1) => Observable<T>) {
    this._requestQueue = new Subject<T1>();
    this._errorQueue = new Subject<Error>();
    this._resultQueue = this._requestQueue.pipe(
      switchMap(_ => {
        this._loaded = false;
        return loaderFunction(_).pipe(
          catchError(error => {
            this._loaded = true;
            this._errorQueue.next(error);
            // Returning EMPTY observable won't complete the stream
            return EMPTY;
          })
        );
      }),
      map(_ => {
        this._loaded = true;
        return _;
      }),
    );
  }

  public load(arg?: T1): void {
    this._requestQueue.next(arg);
  }

  public subscribe(successFn: (T) => any, errorFn?: (error: any) => void, 
    completeFn?: () => void): Subscription {
    
    this._errorQueue.subscribe(err => {
      errorFn(err);
    });
    return this._resultQueue.subscribe(successFn, null, completeFn);
  }

  public complete() {
    this._requestQueue.complete();
    this._errorQueue.complete();
  }
  
  get loaded(): boolean {
    return this._loaded;
  }
}

在您将执行请求的其他文件中(简单)

export class Component {
  readonly loader: Loader<ResponseType, RequestParamType>;

  constructor() {
    this.loader = new Loader(param => this.http.get(param));
    this.loader.subscribe(res => {
      // Your stuffs
    }, (error) => { 
      // Error Handling stuffs
    }, () => {
      // on Complete stuffs (Optional)
    });
  }

  ngOnInit() {
    this.loadData();
  }

  loadData() { // Call this function whenever you want to refresh the data
    this.loader.load(params); // this param will directly passed to the http request
  }
}

我在加载器中定义了其他参数,这可以帮助您喜欢加载状态和完成流的选项(在 ngOnDestroy 中)

编码愉快!

你可以使用这个场景,它非常适合我

 private _getReactions$: Subject<any> = new Subject();

 constructor() {
  this.init();
 }
 private init(){
   this._getReactions$
   .pipe(
    switchMap(() => {
        return this.http.get(...)
        // http request 
    }),
    catchError(error => {
      console.log(error);
      throw error;
    })
  )
  .subscribe(data => {
      console.log(data)
      //results handling
  }, () => this.init());
 }