rxjs 从所有订阅者请求数据并在他们全部返回后完成

rxjs request data from all subscribers and complete after they've all returned

我正在编写一个 angular 2 应用程序,该应用程序具有同一组件的多个实例,这些实例需要相互通信。他们共享一个通信服务来完成这个。

对于这个问题,组件称为 WidgetComponent,通信服务称为 CommService。

我需要一个任何 WidgetComponent 都可以调用的 CommService 方法,它将 return 信息基于所有其他 WidgetComponent 中的信息。

逻辑流程如下:

  1. WidgetComponent 调用 CommService.getInfo()

  2. CommService 从所有其他小部件请求信息

  3. 所有 WidgetComponents return 到 CommService 的信息

  4. CommService 在所有 WidgetComponents 响应后处理所有信息

  5. CommService returns 结果到原始 WidgetComponent

我知道如何使用传统的观察者模式来解决这个问题。这将是相当直接的。

CommService 将有一个方法 onGetInfo 和 属性 onGetInfoSubscribers,例如:

var onGetInfoSubscribers = [];
onGetInfo(callback) {
    onGetInfoSubscribers.push(callback);
}

然后 CommService 会有另一个方法 getInfo,例如:

getInfo() {
    var data = [];
    for (var callback in onGetInfoSubscribers) {
        // probably add some logic to ignore the callers info
        data.push(callback());
    }

    // processData is just an arbitrary function that combines all the info from the WidgetComponents
    return processData(data);
}

然后每个组件在创建时都会向 CommService 注册,例如:

var info = { ... myData ... }
CommService.onGetInfo(function() { return info });

然后他们每个人都可以像这样调用方法:

var desiredInfo = CommService.getInfo();

这会工作得很好,但我正在研究 RxJS 和 observables,它们提供了大量的实用函数,我想更好地理解它们,但我不知道如何公平地实现它RxJS 中的简单模式。

我想我需要在CommService中暴露某种getInfoObservable,然后每个WidgetComponent都需要订阅它,然后调用getInfo方法时,CommService需要调用getInfoObservable主体的next()方法,但是当 observable 触发时,每个组件都需要以某种方式 return 将它们的信息发送给 CommService,并且 CommService 需要知道它们何时完成。

我最擅长的代码是在 CommService 中:

private infoSource = new Subject();
public info$ = this.infoSource.asObservable().combineAll(info => processData(info));

getInfo() {
    this.infoSource.next();
}

然后在WidgetComponent中:

private info = { ...myData... };
commService.info$.subscribe(e => return this.info);

但我知道这还差得远,根本不起作用。

任何正确方向的提示都将不胜感激。特别是如果他们使用 angular 2 版本的 RxJS。

这是我想到的解决方案。这不是你所追求的,但我认为它会让你接近。我不会在将响应返回给调用小部件之前对其进行聚合,但是调用小部件将在它们进入时接收所有响应。如果有必要,也许您可​​以弄清楚聚合器。

我认为在这里使用 Subject 是最好的方法。 Subject 是一种 Observable,可以同时用作 Observable 和 Observer,这使得它能够进行双向通信。

我的解决方案在 CommService 中使用了两个主题。一个处理 infoRequested 事件,一个处理 infoReceived 事件。在您的小部件既不是发布者又不是订阅者的情况下,您可以摆脱一个,但在这种情况下会导致无限循环。所以我带了两个。

一起来看看吧。这是我的通讯服务:

import { Injectable } from '@angular/core';
import { Subject } from 'rxjs/Subject';
import { Observable } from 'rxjs/Observable';
import 'rxjs/add/operator/filter';

@Injectable()
export class CommService {

  private infoRequested$: Subject<string>;
  private infoSent$: Subject<WidgetData>;

  constructor() { 
    this.infoRequested$ = new Subject<string>();
    this.infoSent$ = new Subject<WidgetData>();
  }

  //Event that is fired when requesting information FROM a widget
  infoRequested(): Observable<any> {
    return this.infoRequested$.asObservable();
  }

  //Fired from the widget that is requesting information from all widgets
  requestInfoFromAll(widgetName: string): void {
    this.infoRequested$.next(widgetName);
  }

  //Used to send information from a widget after receiving an info Requested event
  sendInfo(widgetData: WidgetData): void {
    this.infoSent$.next(widgetData);
  }

  //Event that returns results after a widget has fired off a request Info from All
  receiveInfoFromAll(widgetName: string): Observable<any> {
    return this.infoSent$
               .asObservable()
               .filter(widget => widget.sendToWidget === widgetName);
  }

}

export class WidgetData {
  constructor(public name: string, public status: string, public sendToWidget: string) {}
}

初始化后,我们有了 infoRequested 方法。小部件订阅此事件并在触发时传输数据。它会在触发 requestInfoFromAll 事件时触发。

接下来是 Widget 用来传输数据的 sendInfo 方法。最后是 receiveInfoFromAll,它将成为通过 SendInfo 传输的数据的接收者。我也为此应用了一个过滤器,用于仅将数据发送到请求它的小部件。

在组件方面,我有一个创造性地命名为 Widget1Component 的组件,带有 widgetStatus 和 widgetName 输入。

这是我的 widget1.component:

import { Component, Input, OnInit } from '@angular/core';
import { CommService, WidgetData } from '../comm.service';

@Component({
  selector: 'app-widget1',
  template: `
    <div>
      <button type="button" (click)="onGetInfoFromWidgets()">Get Status from all Widgets</button>
    </div>
    <div *ngFor="let widget of widgets">
        {{ widget.name }} - {{ widget.status }}
    </div>
  `
})
export class Widget1Component implements OnInit {

  @Input() widgetName: string;
  @Input() widgetStatus: string;
  widgets: Array<WidgetData> = [];

  constructor(private commSvc: CommService) {}

  ngOnInit() {

    //Subscribe to event requesting my information
    this.commSvc
        .infoRequested()
        .subscribe(
            //Send my info when asked
            requester => this.commSvc.sendInfo(new WidgetData(this.widgetName, this.widgetStatus, requester))
        );

    //Subscribe to event returning all widget statuses to me
    this.commSvc
        .receiveInfoFromAll(this.widgetName)
        .subscribe(widget => this.widgets.push(widget));

  }

  onGetInfoFromWidgets() {
    this.widgets = [];
    //Trigger comm service to get statuses.
    this.commSvc.requestInfoFromAll(this.widgetName);
  }

}

最后,这是我的 app.component.html:

<app-widget1 widgetName="w1" widgetStatus="red"></app-widget1>
<app-widget1 widgetName="w2" widgetStatus="yellow"></app-widget1>
<app-widget1 widgetName="w3" widgetStatus="green"></app-widget1>

当您 运行 应用程序时,您将获得 3 个按钮。单击任何按钮应该会导致所有小部件的状态填充在该按钮下。

就是这样。这是一个有趣的编码练习。我希望它能让你指明正确的方向。

找到解决方案:)

服务代码:

// records how many components have updated the info
let observersCount$ = new BehaviorSubject(0);

// actual info which components listen to
let info$ = new BehaviorSubject({}); 

/** 
logic which updates info$ when all components shared the info.

This is our lucky thing. 
Subject stores how many observers are listening
When the count reaches the total number of observers, 
we update the info$, which sends info to all the components
**/

observersCount$
    .filter(count => (
           count === info$.observers.length && 
           count !== 0
     )
    .subscribe( () => {
        // perform the calculations etc etc., 
        info$.next( newInfo); 

        // reset the count to 0
        observersCount$.next(0);
    });    

// called by the components to update the info
public updateInfo(){
    // every time a component updates info, I increment 
    observersCount$.next(observersCount$.value+1);

    // do the stuff 

}

public getInfo$(){
    return this.info$;
}

组件代码:

constructor(private service: Service){
    service.getInfo$()
        .subscribe( info => {
           // do something with latest info 
        });
}

... 

// inform  service     
this.service.updateInfo();