Angular 2 跨组件共享 websocket 服务

Angular 2 share websocket service across components

我正在使用 angular 2 构建一个 Web 应用程序,我希望在其中让多个组件侦听同一服务。此服务 returns 可观察到 returns 来自 websocket 的传入数据。我根据 this 示例编写了代码。

当前问题是: 数据通过服务从主组件发送到服务器(使用 websockets)并返回数据。但是,只有 home.component 中的观察者被调用(id:room.created 和数据),而不是导航栏中的观察者。

有人能告诉我为什么不同时调用两者吗?我还尝试将消息 $.subscribe 添加到 app.component 但无济于事。

现在,我们来看代码。

returns 可观察的消息服务。组件使用此服务从中发送和接收消息。

@Injectable()
export class MessageService {
    private _messages: Rx.Subject<Message>;
    messages$: Rx.Observable<Message>;

    constructor(wsService: SocketService, private configuration: Configuration) {
      console.log('messag eservice');
      this._messages = <Rx.Subject<Message>>wsService
        .connect()
        .map((response: MessageEvent): Message => {
            let data = JSON.parse(response.data);
            return {
                id: data.id,
                data: data.data,
            }
        });

      this.messages$ = this._messages.asObservable();
    }

    public send(message: Message): void {
      this._messages.next(message);
    }
} 

创建 websocket 连接并将自身绑定到此套接字的输入和输出的套接字服务。

import { Injectable } from '@angular/core';
import * as Rx from "rxjs/Rx";
import { Configuration } from '../app.constants';

@Injectable()
export class SocketService {
    private subject: Rx.Subject<MessageEvent>;

    constructor(private configuration: Configuration){};

    public connect(wsNamespace = ''): Rx.Subject<MessageEvent> {
        var url = this.configuration.wsUrl + wsNamespace;
        if(!this.subject) {
            this.subject = this.create(url);
        }
        return this.subject;
    }

    private create(url): Rx.Subject<MessageEvent> {
        let ws = new WebSocket(url);

        // bind ws events to observable (streams)
        let observable = Rx.Observable.create((obs: Rx.Observer<MessageEvent>) => {
            ws.onmessage = obs.next.bind(obs);
            ws.onerror = obs.error.bind(obs);
            ws.onclose = obs.complete.bind(obs);

            return ws.close.bind(ws);
        });

        // on obs next (send something in the stream) send it using ws.
        let observer = {
            next: (data: Object) => {
                if (ws.readyState === WebSocket.OPEN) {
                    ws.send(JSON.stringify(data));
                }
            },
        };

        return Rx.Subject.create(observer, observable);
    }
}

具有以下提供商的应用程序组件:

  providers: [MessageService, SocketService, Configuration, AuthService]

我正在我的主程序中实例化提供程序 app.component 以确保消息和套接字服务不会被实例化两次。

我的 home.component 看起来像这样(这是一个使用路由加载的页面):

import { Component, OnInit } from '@angular/core';
import { Subject } from 'rxjs';
import { Router }    from '@angular/router';
import { MessageService } from '../../services/message.service';

@Component({
  ...
  providers: []
})
export class HomeComponent implements OnInit {
  constructor(private router: Router, private messageService: MessageService) {}

  ngOnInit(): void {
    this.messageService.send({
      id: 'room.create',
      data: {'name': 'Blaat'}
    });

    this.messageService.messages$.subscribe(msg => {
      console.log(msg);
        if(msg.id == 'room.created') {
            // navigate naar games!
        }
    });
  }

}

我的导航栏组件看起来像这样(指令):

import { Component, OnInit } from '@angular/core';
import { MessageService } from '../../services/message.service';

@Component({
  moduleId: module.id,
  selector: 'navbar',
  templateUrl: 'navbar.component.html',
  styleUrls: ['navbar.component.css']
})
export class Navbar implements OnInit {

  constructor(private messageService: MessageService) { }

  ngOnInit() {

    this.messageService.messages$.subscribe(msg => {
      console.log(msg);
        if(msg.id == 'room.created') {
            // navigate naar games!
        }
    });
  }

}

您的 observable create 函数似乎被调用了多次,很可能是两个组件 => 两个订阅 => 两个 observable create 函数调用。所以最新的 observable create fn 覆盖了之前的 observable 对 websocket onmessage、onerror 和 onclose 的回调。您应该多播底层可观察对象以防止这种情况发生(共享运算符应该可以解决问题)。

        // bind ws events to observable (streams)
        let observable = Rx.Observable.create((obs: Rx.Observer<MessageEvent>) => {
            ws.onmessage = obs.next.bind(obs);
            ws.onerror = obs.error.bind(obs);
            ws.onclose = obs.complete.bind(obs);

            return ws.close.bind(ws);
        }).share();

有关如何正确执行此操作的更多有用资源 https://github.com/ReactiveX/rxjs/blob/master/src/observable/dom/WebSocketSubject.ts https://github.com/blesh/RxSocketSubject

如果有人想使用服务在 angular 上发送加密请求并获取加密响应,然后将其解密以在模板中使用 这是下面的代码

mycomponent.ts 文件:::

import { Component, TemplateRef, OnInit, ViewChild } from '@angular/core';
import { myService } from 'src/app/my.service';
import { Base64 } from 'js-base64';
import { SelectItem } from 'primeng/api';
import { ActivatedRoute, Router } from '@angular/router';


getDataFromService() {
    this._myService.getData("api_name/url")
      .subscribe(data => {
        let myResp = Base64.decode(this.myData = data);
        this.myResponse = JSON.parse(myResp );
}

myservice.ts 文件:::

import { Injectable } from '@angular/core';
import { HttpClient, HttpHeaders, HttpParams } from '@angular/common/http';
import { Observable } from 'rxjs';
import { Iwatchlist } from './watchlist';
import { Base64 } from 'js-base64';

private _url: string = "http://192.168.10.10:1000";

getData(svcName): Observable<any> {
  let baseUrl= this._url + "/"+svcName;

    const httpOptions = {
     headers: { 'Content-Type': 'application/json' },
      "request": {
        "FormFactor": "M",
        "svcGroup": "portfolio",
        "svcVersion": "1.0.0",
        "svcName": svcName,
        "requestType": "U",
        "data": {
          "gcid": "123",
          "gscid": "abc123"
        }
      }
    };
  
    let encryptedData = Base64.encode(JSON.stringify(httpOptions));
    let postRequest = this.http.post(baseUrl, encryptedData, {responseType: 'text'});
    return postRequest;
  }