如何在 Angular 10 中将数组转换为 Observable?

How do I turn an array into an Observable in Angular 10?

我正在对来自 Gemini 比特币网络套接字的数据进行简单显示。我能够订阅 websocket 并在我的订阅功能中处理传入的消息。这一切都按预期工作。 dataFromMessages 在我处理新消息时更改值。

使 dataFromMessages 成为 Observable 的下一步是什么?

我知道如果它是一个 Observable,那么我可以在我的 html 中使用“| async”。或者我可以订阅 Observable。但我不知道如何使它成为 Observable。使用 or() 函数?

output.component.ts

import { Component, OnInit } from '@angular/core';
import { UserWebsocketService } from '../services/user-websocket.service';

export class OutputComponent implements OnInit{

  dataFromMessages;

  constructor(private websocket: UserWebsocketService){}

  ngOnInit(): void {
    this.websocket.connect('wss://api.gemini.com/v1/marketdata/btcusd')
    .subscribe(message => this.process_message(message));
  }

  process_message(message){
    // *** do processing here with incoming message object***
    
    this.dataFromMessages = processed_data;
  }
}

output.component.html

<table>
  <thead>
    <tr> <th>Bid</th> <th>Ask</th> </tr>
  </thead>
  <tbody>
    <tr *ngFor="let item of dataFromMessages">
      <td>{{ item.bid }}</td>
      <td>{{ item.ask }}</td>
    </tr>
  </tbody>
</table>

dataFromMessages初始化为可观察的;

dataFromMessages$: Observable<any>;

您可以使用 rxjs pipe and tap 运算符代替订阅来完成魔术

 ngOnInit(): void {
   this.dataFromMessages$ = this.websocket.connect('wss://api.gemini.com/v1/marketdata/btcusd')
   .pipe(tap(message) => this.process_message(message));
}

并在 process_message 方法中 return 处理的消息。

process_message(message){
   // *** do processing here with incoming message object***

   return processed_data;
}

并在模板中使用异步管道订阅和读取数据。

<tr *ngFor="let item of dataFromMessages$ | async">
  <td>{{ item.bid }}</td>
  <td>{{ item.ask }}</td>
</tr>

您应该将 dataFromMessages 声明为您想要的类型的可观察对象。 然后在 ngOnInit() 中使用管道内的 rxjs map() 运算符进行处理并 return 它。在这里,您可以“正常”使用消息,在管道完成后,它将是一个可观察的转换,使用您在管道内使用的运算符。

然后在你的HTML中使用异步管道

<tr *ngFor="let item of dataFromMessages$ | async">
  <td>{{ item.bid }}</td>
  <td>{{ item.ask }}</td>
</tr>
dataFromMessages$: Observable<myDataType>;

ngOnInit(): void {
  dataFromMessages$ = this.websocket.connect('wss://api.gemini.com/v1/marketdata/btcusd').pipe(
    map(message => {

      // Do my quantum physics and message processing here

      return processed_data;
    })
  );
}