订阅数组中所有当前和未来的项目

Subscribe to all current and future items in array

将 RxJS 与 Angular5 结合使用,我有一个包含数组的服务。 随着数组的填充,一些组件将被创建/销毁/重新创建。

所以我们有这样的服务:

@Injectable
export class MyService {
   public events = [];
}

然后我们有一个像这样的组件:

@Inject(MyService)
@Component({})
export class MyComponent {

  mySub: Subscriber

  constructor(private ms: MyService){}

  ngOnInit(){

   this.mySub = Rx.Observable.from(this.ms.events).subscribe(v => {

   });

  }

}

我的问题是 - 如果事件数组中已经包含元素,那么在创建组件时,它将选取所有现有元素,但是在创建订阅后添加到数组中的元素呢?我如何监听元素何时添加到数组?

如果我使用 Subject,问题是我不认为它存储历史项目,只是触发新项目。

好的,所以我们使用 concat 运算符合并两个 observables,它似乎有效。从本质上讲,这个答案只是从头开始重新创建一个 ReplaySubject。我建议不要使用这个答案,而是使用 ReplaySubject。无论如何:

服务看起来像这样

@Injectable
export class MyService {

   public subject = new Rx.Subject();
   public events = [];

   addItem(v: any){
    this.events.push(v);
    this.subject.next(v);
  }
}

然后组件看起来像这样

@Inject(MyService)
@Component({})
export class MyComponent {

  mySub: Subscriber

  constructor(private ms: MyService){}

  ngOnInit(){

    const obs = Rx.Observable.from(this.ms.events).concat(this.ms.subject);
    this.mySub = obs.subscribe(v => {

    });

  }

}

想法是将数组中的可观察对象与主题连接起来,并且数组可观察对象中的所有项目都将首先触发。

我会将事件存储在您的服务中。这样服务就可以保留所有事件的历史记录,您可以使用主题来发出该历史记录。

@Injectable()
export class MyService {
    sub$ = new BehaviorSubject<any[]>([]);
    events: any[] = [];

    // add event to array and push to subject
    addEvent(event){
        this.events.push(event);
        this.sub$.next(this.events);
    }
    // find and remove event from array then push to subject
    removeEvent(event){
        let index = this.events.findIndex(item => {
            return event.id === item.id;
        });
        // if index is found, remove from array
        if(index !== -1){
            this.events.splice(index, 1);
            this.sub$.next(this.events);
        }
    }

}

我会使用行为主题,以便订阅的观察者收到最后一次发射(也可以使用重播主题)。

这是一个stackblitz演示

如果您担心状态,您应该使用 Subject 可以用一些数据初始化然后传播。

@Injectable
export class MyService {
   public events = [];
   public data$: Subject<any>;

   constructor() {
     this.data$ = this.events;
   }

   addItem(v: any) {
     this.event.push(v);
     this.data$.next(this.event);
   }
}

然后在你的组件中,如果你想要一个实时的数据源。

@Component({//usual angular stuff here})
export class MyComponent implements OnInit {
  myData: Subject<any>;

  constructor(private myService: MyService) { }

  ngOnInit() {
    this.myData = this.myService.data$;
  }
}

然后您可以使用 async 管道或订阅 myData 来获取您的数据。

我认为@LLai 拥有您需要的大部分内容,但我会将 BehaviorSubject 更改为 ReplaySubject 并将发射更改为单个事件(给出 Alexander Mill 的回答)。

这不包括删除事件,尽管我没有看到提到该要求。

我的服务

@Injectable()
export class MyService {
  event$ = new ReplaySubject<any>();

  addEvent(event){
    this.event$.next(event);
  }
}

我的组件

ngOnInit(){
  this.mySub = this.ms.event$.subscribe(v => {
    ...
  });
}

演示

const replaySubject$ = new Rx.ReplaySubject(); 
// Also, pass in a buffer size to stop blow-out (if applicable)
// const replaySubject$ = new Rx.ReplaySubject(maxBufferSize); 


// Events before subscribe
replaySubject$.next('event1');
replaySubject$.next('event2');

replaySubject$.subscribe(console.log);

// Events after subscribe
replaySubject$.next('event3');
replaySubject$.next('event4');
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>