Angular 5 从无尽的 BehaviorSubject 中完成一个 observable

Angular 5 Complete an obvservable from endless BehaviorSubject

所以我有一个 Observable 数据存储,每次存储更改时都有一个私有的 BehaviorSubject 发射项目。

我还有一个 public 方法,该方法 return 是从 BehaviorSubject 创建的可观察对象,可根据状态过滤项目和跳过项目。

我想知道我如何才能 return 一个在满足特定条件后自行完成的可观察对象而不将责任交给消费者?

编辑:这种情况下的解决方案是让消费者使用 .take(1) 运算符,以便它在第一次发出后完成。

下面是一些与我的问题相关的代码:

class Item {

  public id: string;
  public state: number;
};


@Injectable()
export class SomeDataService(){

  private let items   = [];                          //The store of items
  private let stream$ = new BehaviorSubject<Item[]>; //The endless stream of items.

  constructor( someService: SomeOtherService ){

    this.someService.items$.subscribe( items => {

      this.items = items;
      this.stream$.next( items );
    });
  };


  //
  public getObservable( filterID: string ): Observable<Item> {

    return this.$stream.asObservable().map( items => {

      //Find the item in the list and return it
      return items.find( item => {
        return item.id === filterID;
      });
    }).flatMap( item => {

      if( item && item.state === 3 ) { //arbitrary number
        throw Observable.throw( item );
      }

      //Transform the item and such...

      return Observable.of( item );
    }).skipWhile( item => {

      return item && item.state !== 1;
    });
  };
};

//一些其他文件来使用服务

this.someDataService.getObservable( 'uniqueID' ).finally( () => {
  console.log("Now this gets printed when it emits the first time and on errors.");
}).subscribe();

/*
  //Previous attempt
  this.someDataService.getObservable( 'uniqueID' ).finally( () => {

    console.log("I will never print this as the observable never completes.");
  }).subscribe();
*/

当您将项目存储在 private const items = []; 中时,不知道为什么要使用 BehaviorSubject 而且我个人不喜欢使用服务 'do the logic'。不管怎样,问题出在 skipWhile

例如,如果您将此项目分配给您的项目 属性 :

const itemsO: Item[] = [
    {
        id: '0',
        state: 1
    },
    {
        id: '1',
        state: 1
    },
    {
        id: '2',
        state: 1
    },
    {
        id: '0',
        state: 1
    },
];

并使用 '0' 作为参数调用 getObservable 函数,代码可以正常工作,因为 skipWhile 会发现谓词为真,所以它会完成工作。

我已经对你的代码做了这个并试过了,它工作正常:

服务:

import { Injectable } from '@angular/core';
import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/mergeMap';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/skipWhile';
class Item {

    public id: string;
    public state: number;
}

const itemsO: Item[] = [
    {
        id: '0',
        state: 1
    },
    {
        id: '1',
        state: 1
    },
    {
        id: '2',
        state: 1
    },
    {
        id: '0',
        state: 1
    },
];

@Injectable()
export class SomeDataService {

    private items = [];
    private stream$: BehaviorSubject<Item[]> = new BehaviorSubject<Item[]>(null);

    constructor() {
        this.items = itemsO;
        this.stream$.next(itemsO);
    }

    public getObservable(filterID: string): Observable<Item> {
        return this.stream$.asObservable().map(items => {

            // Find the item in the list and return it
            return items.find(item => {
                return item.id === filterID;
            });
        }).flatMap(item => {

            if (item && item.state === 3) { // arbitrary number
                throw Observable.throw(item);
            }

            // Transform the item and such...

            return Observable.of(item);
        }).skipWhile(item => {
            return item.state !== 1;
        });
    }
}

在主要组件上:

constructor(a: SomeDataService) {
        a.getObservable('0')
            .subscribe(evt => {
                console.log(evt);
                // You will see something in console
            });
    }

我不知道你想达到什么目的,但问题只在于此skipWhile。如果你想丢弃数据或发出一个空数组,如果 status !== 1 只需使用另一个运算符

所以这里的解决方案是让消费者使用 .take(1) 运算符以便在第一个项目发出后完成。

this.someDataService.getObservable( 'uniqueID' ).finally( () => {
  console.log("Now this gets printed when it emits the first time and on errors.");
}).subscribe();

/*
  //Previous attempt
  this.someDataService.getObservable( 'uniqueID' ).finally( () => {

    console.log("I will never print this as the observable never completes.");
  }).subscribe();
*/