暂停和恢复可观察流,请提出更好的选择

Pausing and resuming an observable stream, please suggest better options

我需要从可观察对象中收听项目流。当某些情况出现时,将对项目执行异步任务,并且组件将 'busy' 直到完成。我想暂停处理订阅中的项目,直到此任务完成(因为以下项目的处理取决于结果),然后从序列中的下一个项目继续,而不会有任何损失。

下一部分最好在看 Plunk 时阅读 here

为此,我使用了 buffer with a swtichMap。我以为这些可以自己完成这项工作,但 switchMap 会破坏并重新创建订阅,每次都会重置序列。

export class AppComponent implements OnInit {
    source$: Observable<any>;
    clearBuffer$ = new Subject();
    busy$ = new Subject();

    private itemSubscription: Subscription;
    private stayAliveSubscription: Subscription;

    items: any[] = [];

    constructor() { }

    ngOnInit() {
      this.source$ = Observable.range(1, 500).zip(
        Observable.interval(500),
        function (x, y) { return x; }
      ).share();

      this.busy$
        .subscribe(result => {
          if (!result) {
            this.clearBuffer$.next();
          }
        }, error => {
          console.log(error);
        });
    }

    start() {
      if (!this.itemSubscription) {
        this.itemSubscription =
          this.busy$.switchMap(busy => {
            if (busy) {
              return this.source$.buffer(this.clearBuffer$);
            } else {
              return this.source$;
            }
          })
            .subscribe(items => {
              if (Array.isArray(items)) {
                this.items.push('buffered: ' + items.join());
              } else {
                this.items.push('live feed: ' + items);
              }
            }, error => {
              this.items.push(error);
            });

        this.stayAliveSubscription = this.source$
          .subscribe(result => {
            console.log(result);
          }, error => {
            console.log(error);
          });

        this.busy$.next(false);
      }
   }
...
}

为了解决这个问题,现在共享 source$ observable 并启动单独的订阅 (stayAliveSubscription),因此使用单个订阅始终。这对我来说似乎很混乱,我想问问是否有人可以告诉我 better/alternative 解决问题的方法。

我将工作示例放在 Plunk 中 here 单击开始以开始订阅,然后 set/unset 繁忙切换到缓冲并继续。

编辑:使用 concatMap

的工作代码

我将 Plunk 更改为使用 concatMap。我也粘贴了下面的代码。关键是 concatMap 中的繁忙可观察对象 return 必须完成,您不能多次 return busy$ 可观察对象并在忙碌状态更改时调用 next。

    source$: Observable<any>;
    busy$ = new Subject();
    busy: boolean;

    private itemSubscription: Subscription;
    private stayAliveSubscription: Subscription;

    items: any[] = [];

    constructor() { }

    ngOnInit() {
      this.source$ = Observable.range(1, 500).zip(
        Observable.interval(500),
        function (x, y) { return x; }
      );

      this.busy$
        .subscribe(busy => {
          this.busy = <any>busy;
        });
    }

    start() {
      if (!this.itemSubscription) {
        this.itemSubscription = this.source$.concatMap(item => {
          const busySubject = new Subject();
          this.busy$
            .subscribe(result => {
              busySubject.next(item);
              busySubject.complete();
            });

          if (this.busy) {
            return busySubject;
          } else {
            return Observable.of(item);
          }

        })
          .subscribe(item => {
            this.items.push(item);
          }, error => {
            this.items.push(error);
          });
      }

      this.setBusy(false);
    }

我不完全明白你想做什么,但如果只是保持发出值的顺序而 "async task" 可能需要很长时间(随机),我我猜你可以使用 concatMap 运算符。

理论

concatMap

Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.

练习

在此示例中,srcObservable 每 100 毫秒发出一个值,每个值都映射到一个新的可观察值,该值在 0 到 2000 毫秒之间发出一个值(异步任务)。可以看到订单是安全的。

let src = Rx.Observable.timer(0,100);
src.concatMap(i=>{
  return Rx.Observable.timer(Math.random()*2000).mapTo(i); // this is the async task
}).subscribe(data=>console.log(data));
<script src="https://unpkg.com/rxjs@5.4.0/bundles/Rx.min.js"></script>

制作热门 Observable

您也不应该使用这些订阅来让您的可观察对象发出数据。实际上你应该使用 .publish().connect() 而不是 share()subscribe() 来转换你的 cold observable to a hot one :

this.source$ = Observable.range(1, 500).zip(
        Observable.interval(500),
        function (x, y) { return x; }
      ).publish();
// blah blah blah some code
this.source$.connect();

delayWhen 是一个非常强大的运算符。我的解决方案使用 mergeMapdelayWhen.

功能:重试、限制、暂停、恢复

  1. 创建并订阅 Observable
const concurrentLimit = 5
const retryLimit = 10
const source$ = from(new Array(100).fill(0).map((_, i) => i))
// remove <boolean> if not typescript
const pause$ = new BehaviorSubject<boolean>(false);
const pass$ = pause$.pipe(filter((v) => !v));

const throttledTask$ = source$.pipe(
  mergeMap((item) => {
    return of(item).pipe(
      delayWhen(() => pass$),
      mergeMap(async (item) => {
         // you can also throw some errors
         return await new Promise((resolve)=>
             setTimeout(resolve(item), Math.random()*1000))
      }),
      retryWhen((errors$) => errors$.pipe(delay(1000), take(retryLimit)))
    );
  }, concurrentLimit)

const subscription = throttledTask$.subscribe(x => console.log(x))
  1. 添加 Pause/Resume 个事件处理程序
const pause = () => { pause$.next(true) }
const resume = () => { pause$.next(false) }

解释:

  1. delayWhen 将暂停流并等待 pass$ 信号发出。
  2. BehaviorSubject用于组合pass$信号,订阅时会发出最后一个值。
  3. mergeMap可以处理异步任务,有并发线程数限制参数。当 delayWhen 暂停流时,该流将保留在 mergeMap 内并占用并发 'thread'.
  4. retryWhen 将重新订阅,直到 errors$.pipe(delay(1000), take(retryLimit)) 发出完成或错误。