使用 RxJS 5 将普通字符串 [] 转换为 Observable<string[]> 并将其连接到另一个 Observable<string[]>
Convert a plain string[] into a Observable<string[]> and concat it to another Observable<string[]> using RxJS 5
我正在尝试将普通的 string[]
转换为 Observable<string[]>
并将其连接到现有的 Observable<string[]>
。
然后我将使用 angular2 async
管道来显示 Observable
。
这是我的代码:
import {Injectable} from "angular2/core";
import {Observable} from "rxjs/Observable";
import 'rxjs/Rx';
@Injectable()
export class AppService {
constructor() {
console.log('constructor', 'appService');
this.constructSomeObservable();
}
someObservable$:Observable <string[]>;
constructSomeObservable() {
this.someObservable$ = Observable.create(observer => {
const eventSource = new EventSource('/interval-sse-observable');
eventSource.onmessage = x => observer.next(JSON.parse(x.data));
eventSource.onerror = x => observer.error(console.log('EventSource failed'));
return () => {
eventSource.close();
};
});
this.someObservable$.subscribe(
theStrings=> {
console.log(theStrings);
//Somehow convert the plain array of strings to an observable and concat it to this.someObservable$ observable...
},
error=>console.log(error)
);
}
}
有人可以帮忙吗?
此外,我想确保服务实例Observable<string[]>
随着EventSource的重复调用而不断更新。我的订阅逻辑在正确的地方吗?
编辑 1:我尝试使用 RxJS concat
运算符如下:
this.someObservable$.subscribe(
theStrings=> {
console.log(theStrings);
this.someObservable$ = this.someObservable$.concat(Observable.create(theStrings));
},
error=>console.log(error)
);
}
连同 angular2 async
管道:
<ul>
<li *ngFor="#s of appService.someObservable$ | async">
a string: {{ s }}
</li>
</ul>
页面上什么也没有显示;字符串刚刚显示在控制台上...
我哪里错了?
编辑 2:该应用可在 github here
上使用
edit 3:我已经考虑了 Thierry 的建议,尤其是为了订阅而使用 async
管道以及扫描运算符的使用.
现在唯一剩下的问题是我需要单击路由器 link 以便在模板上呈现字符串...模板不会自动更新...
查看 github 上的项目和相关标签:https://github.com/balteo/demo-angular2-rxjs/tree/36864628/536299
我会利用 scan
运算符来做到这一点。这是一个示例:
@Component({
selector: 'app'
template: `
<div>
<div *ngFor="#elt of someObservable$ | async">{{elt.name}</div>
</div>
`
})
export class App {
constructor() {
this.someObservable$ = Observable.create((observer) => {
const eventSource = new EventSource('/interval-sse-observable');
eventSource.onmessage = x => observer.next(JSON.parse(x.data));
eventSource.onerror = x => observer.error(console.log('EventSource failed'));
return () => {
eventSource.close();
};
})
.startWith([])
.scan((acc,value) => acc.concat(value));
}
}
这里是对应的plunkr:https://plnkr.co/edit/St7LozX3bnOBcoHaG4uM?p=preview.
有关详细信息,请参阅此问题:
我正在尝试将普通的 string[]
转换为 Observable<string[]>
并将其连接到现有的 Observable<string[]>
。
然后我将使用 angular2 async
管道来显示 Observable
。
这是我的代码:
import {Injectable} from "angular2/core";
import {Observable} from "rxjs/Observable";
import 'rxjs/Rx';
@Injectable()
export class AppService {
constructor() {
console.log('constructor', 'appService');
this.constructSomeObservable();
}
someObservable$:Observable <string[]>;
constructSomeObservable() {
this.someObservable$ = Observable.create(observer => {
const eventSource = new EventSource('/interval-sse-observable');
eventSource.onmessage = x => observer.next(JSON.parse(x.data));
eventSource.onerror = x => observer.error(console.log('EventSource failed'));
return () => {
eventSource.close();
};
});
this.someObservable$.subscribe(
theStrings=> {
console.log(theStrings);
//Somehow convert the plain array of strings to an observable and concat it to this.someObservable$ observable...
},
error=>console.log(error)
);
}
}
有人可以帮忙吗?
此外,我想确保服务实例Observable<string[]>
随着EventSource的重复调用而不断更新。我的订阅逻辑在正确的地方吗?
编辑 1:我尝试使用 RxJS concat
运算符如下:
this.someObservable$.subscribe(
theStrings=> {
console.log(theStrings);
this.someObservable$ = this.someObservable$.concat(Observable.create(theStrings));
},
error=>console.log(error)
);
}
连同 angular2 async
管道:
<ul>
<li *ngFor="#s of appService.someObservable$ | async">
a string: {{ s }}
</li>
</ul>
页面上什么也没有显示;字符串刚刚显示在控制台上...
我哪里错了?
编辑 2:该应用可在 github here
上使用edit 3:我已经考虑了 Thierry 的建议,尤其是为了订阅而使用 async
管道以及扫描运算符的使用.
现在唯一剩下的问题是我需要单击路由器 link 以便在模板上呈现字符串...模板不会自动更新...
查看 github 上的项目和相关标签:https://github.com/balteo/demo-angular2-rxjs/tree/36864628/536299
我会利用 scan
运算符来做到这一点。这是一个示例:
@Component({
selector: 'app'
template: `
<div>
<div *ngFor="#elt of someObservable$ | async">{{elt.name}</div>
</div>
`
})
export class App {
constructor() {
this.someObservable$ = Observable.create((observer) => {
const eventSource = new EventSource('/interval-sse-observable');
eventSource.onmessage = x => observer.next(JSON.parse(x.data));
eventSource.onerror = x => observer.error(console.log('EventSource failed'));
return () => {
eventSource.close();
};
})
.startWith([])
.scan((acc,value) => acc.concat(value));
}
}
这里是对应的plunkr:https://plnkr.co/edit/St7LozX3bnOBcoHaG4uM?p=preview.
有关详细信息,请参阅此问题: