将多个 Observables 与不同的 actions/operations 结合起来
Combine multiple Observables with different actions/operations
我正在构建一个 Angular2 应用程序,所以我逐渐习惯了 Observables 和 Reactive Extensions 作为一个整体。我正在使用 TypeScript 和 rxjs。
现在我得到了一些对象数组的可观察对象或流。让我们说人对象。现在我有另外两个 Person-objects 流,我想将它们结合起来,所以我得到一个始终是最新的流:
var people$ = getPeople(); // Observable<Person[]>
var personAdded$ = eventHub.personAdded; // Observable<Person>;
var personRemoved$ = eventHub.personRemoved // Observable<Person>;
var allwaysUpToDatePeople$ = people$.doSomeMagic(personAdded$, personRemoved$, ...);
如果 people-stream 发出一个数组,比方说,5 个人,然后 personAdded-stream 发出一个人,allPeople-stream 将发出一个 6 的数组。
如果 personRemoved-stream 发出一个人,allPeople-stream 应该发出一组 Person-objects 而不是 personRemoved-stream 刚刚发出的那个。
rxjs 中是否有内置的方法来实现这种行为?
您想合并所有流(捉鬼敢死队风格),然后使用扫描运算符找出状态。扫描运算符的工作方式类似于 Javascript reduce.
这是一个演示...
const initialPeople = ['Person 1', 'Person 2', 'Person 3', 'Person 4'];
const initialPeople$ = Rx.Observable.from(initialPeople);
const addPeople = ['Person 5', 'Person 6', 'Person 7'];
const addPeople$ = Rx.Observable.from(addPeople)
.concatMap(x => Rx.Observable.of(x).delay(1000)); // this just makes it async
const removePeople = ['Person 2x', 'Person 4x'];
const removePeople$ = Rx.Observable.from(removePeople)
.delay(5000)
.concatMap(x => Rx.Observable.of(x).delay(1000));
const mergedStream$ = Rx.Observable.merge(initialPeople$, addPeople$, removePeople$)
mergedStream$
.scan((acc, stream) => {
if (stream.includes('x') && acc.length > 0) {
const index = acc.findIndex(person => person === stream.replace('x', ''))
acc.splice(index, 1);
} else {
acc.push(stream);
}
return acc;
}, [])
.subscribe(x => console.log(x))
// In the end, ["Person 1", "Person 3", "Person 5", "Person 6", "Person 7"]
http://jsbin.com/rozetoy/edit?js,console
您没有提到数据的结构。我使用 "x" 作为标志有点(很多)笨拙且有问题。但我想您知道如何修改扫描运算符以适合您的数据。
我的建议是将 action
的想法包装到流中,然后可以将其合并并直接应用于 Array
。
第一步是定义一些描述您的操作的函数:
function add(people, person) {
return people.concat([people]);
}
function remove(people, person) {
const index = people.indexOf(person);
return index < 0 ? people : people.splice(index, 1);
}
注意:我们避免就地改变数组,因为它可能会产生无法预料的副作用。 Purity 要求我们创建数组的副本。
现在我们可以使用这些函数并将它们提升到流中以创建一个 Observable
来发出函数:
const added$ = eventHub.personAdded.map(person => people => add(people, person));
const removed$ = eventHub.personRemoved.map(person => people => remove(people, person));
现在我们得到以下形式的事件:people => people
其中输入和输出将是一组人(在此示例中简化为字符串数组)。
现在我们如何连接它?好吧,我们真的只关心添加或删除这些事件 在 我们有一个数组将它们应用于:
const currentPeople =
// Resets this stream if a new set of people comes in
people$.switchMap(peopleArray =>
// Merge the actions together
Rx.Observable.merge(added$, removed$)
// Pass in the starting Array and apply each action as it comes in
.scan((current, op) => op(current), peopleArray)
// Always emit the starting array first
.startWith(people)
)
// This just makes sure that every new subscription doesn't restart the stream
// and every subscriber always gets the latest value
.shareReplay(1);
根据您的需要(即避免函数柯里化,或使用二分搜索),可以对这种技术进行多种优化,但我发现上述方法对于一般情况来说相对优雅。
我正在构建一个 Angular2 应用程序,所以我逐渐习惯了 Observables 和 Reactive Extensions 作为一个整体。我正在使用 TypeScript 和 rxjs。
现在我得到了一些对象数组的可观察对象或流。让我们说人对象。现在我有另外两个 Person-objects 流,我想将它们结合起来,所以我得到一个始终是最新的流:
var people$ = getPeople(); // Observable<Person[]>
var personAdded$ = eventHub.personAdded; // Observable<Person>;
var personRemoved$ = eventHub.personRemoved // Observable<Person>;
var allwaysUpToDatePeople$ = people$.doSomeMagic(personAdded$, personRemoved$, ...);
如果 people-stream 发出一个数组,比方说,5 个人,然后 personAdded-stream 发出一个人,allPeople-stream 将发出一个 6 的数组。 如果 personRemoved-stream 发出一个人,allPeople-stream 应该发出一组 Person-objects 而不是 personRemoved-stream 刚刚发出的那个。
rxjs 中是否有内置的方法来实现这种行为?
您想合并所有流(捉鬼敢死队风格),然后使用扫描运算符找出状态。扫描运算符的工作方式类似于 Javascript reduce.
这是一个演示...
const initialPeople = ['Person 1', 'Person 2', 'Person 3', 'Person 4'];
const initialPeople$ = Rx.Observable.from(initialPeople);
const addPeople = ['Person 5', 'Person 6', 'Person 7'];
const addPeople$ = Rx.Observable.from(addPeople)
.concatMap(x => Rx.Observable.of(x).delay(1000)); // this just makes it async
const removePeople = ['Person 2x', 'Person 4x'];
const removePeople$ = Rx.Observable.from(removePeople)
.delay(5000)
.concatMap(x => Rx.Observable.of(x).delay(1000));
const mergedStream$ = Rx.Observable.merge(initialPeople$, addPeople$, removePeople$)
mergedStream$
.scan((acc, stream) => {
if (stream.includes('x') && acc.length > 0) {
const index = acc.findIndex(person => person === stream.replace('x', ''))
acc.splice(index, 1);
} else {
acc.push(stream);
}
return acc;
}, [])
.subscribe(x => console.log(x))
// In the end, ["Person 1", "Person 3", "Person 5", "Person 6", "Person 7"]
http://jsbin.com/rozetoy/edit?js,console
您没有提到数据的结构。我使用 "x" 作为标志有点(很多)笨拙且有问题。但我想您知道如何修改扫描运算符以适合您的数据。
我的建议是将 action
的想法包装到流中,然后可以将其合并并直接应用于 Array
。
第一步是定义一些描述您的操作的函数:
function add(people, person) {
return people.concat([people]);
}
function remove(people, person) {
const index = people.indexOf(person);
return index < 0 ? people : people.splice(index, 1);
}
注意:我们避免就地改变数组,因为它可能会产生无法预料的副作用。 Purity 要求我们创建数组的副本。
现在我们可以使用这些函数并将它们提升到流中以创建一个 Observable
来发出函数:
const added$ = eventHub.personAdded.map(person => people => add(people, person));
const removed$ = eventHub.personRemoved.map(person => people => remove(people, person));
现在我们得到以下形式的事件:people => people
其中输入和输出将是一组人(在此示例中简化为字符串数组)。
现在我们如何连接它?好吧,我们真的只关心添加或删除这些事件 在 我们有一个数组将它们应用于:
const currentPeople =
// Resets this stream if a new set of people comes in
people$.switchMap(peopleArray =>
// Merge the actions together
Rx.Observable.merge(added$, removed$)
// Pass in the starting Array and apply each action as it comes in
.scan((current, op) => op(current), peopleArray)
// Always emit the starting array first
.startWith(people)
)
// This just makes sure that every new subscription doesn't restart the stream
// and every subscriber always gets the latest value
.shareReplay(1);
根据您的需要(即避免函数柯里化,或使用二分搜索),可以对这种技术进行多种优化,但我发现上述方法对于一般情况来说相对优雅。