使用 RxJS 5 将普通字符串 [] 转换为 Observable<string[]> 并将其连接到另一个 Observable<string[]>

Convert a plain string[] into a Observable<string[]> and concat it to another Observable<string[]> using RxJS 5

我正在尝试将普通的 string[] 转换为 Observable<string[]> 并将其连接到现有的 Observable<string[]>

然后我将使用 angular2 async 管道来显示 Observable

这是我的代码:

import {Injectable} from "angular2/core";
import {Observable} from "rxjs/Observable";
import 'rxjs/Rx';


@Injectable()
export class AppService {

    constructor() {
        console.log('constructor', 'appService');
        this.constructSomeObservable();
    }

    someObservable$:Observable <string[]>;

    constructSomeObservable() {
        this.someObservable$ = Observable.create(observer => {
            const eventSource = new EventSource('/interval-sse-observable');
            eventSource.onmessage = x => observer.next(JSON.parse(x.data));
            eventSource.onerror = x => observer.error(console.log('EventSource failed'));
            return () => {
                eventSource.close();
            };
        });

        this.someObservable$.subscribe(
            theStrings=> {
                console.log(theStrings);
                //Somehow convert the plain array of strings to an observable and concat it to this.someObservable$ observable...
            },
            error=>console.log(error)
        );
    }
}

有人可以帮忙吗?

此外,我想确保服务实例Observable<string[]>随着EventSource的重复调用而不断更新。我的订阅逻辑在正确的地方吗?

编辑 1:我尝试使用 RxJS concat 运算符如下:

    this.someObservable$.subscribe(
        theStrings=> {
            console.log(theStrings);
            this.someObservable$ = this.someObservable$.concat(Observable.create(theStrings));
        },
        error=>console.log(error)
    );
 }

连同 angular2 async 管道:

<ul>
    <li *ngFor="#s of appService.someObservable$ | async">
       a string: {{ s }}
    </li>
</ul>

页面上什么也没有显示;字符串刚刚显示在控制台上...

我哪里错了?

编辑 2:该应用可在 github here

上使用

edit 3:我已经考虑了 Thierry 的建议,尤其是为了订阅而使用 async 管道以及扫描运算符的使用.

现在唯一剩下的问题是我需要单击路由器 link 以便在模板上呈现字符串...模板不会自动更新...

查看 github 上的项目和相关标签:https://github.com/balteo/demo-angular2-rxjs/tree/36864628/536299

我会利用 scan 运算符来做到这一点。这是一个示例:

@Component({
  selector: 'app'
  template: `
    <div>
      <div *ngFor="#elt of someObservable$  | async">{{elt.name}</div>
    </div>
  `
})
export class App {
  constructor() {
    this.someObservable$ = Observable.create((observer) => {
      const eventSource = new EventSource('/interval-sse-observable');
      eventSource.onmessage = x => observer.next(JSON.parse(x.data));
      eventSource.onerror = x => observer.error(console.log('EventSource failed'));
      return () => {
        eventSource.close();
      };
    })
    .startWith([])
    .scan((acc,value) => acc.concat(value));
  }
}

这里是对应的plunkr:https://plnkr.co/edit/St7LozX3bnOBcoHaG4uM?p=preview.

有关详细信息,请参阅此问题: