在 RxJS 中分支和(重新)加入值对
Branching and (re-)joining value pairs in RxJS
我想创建一个
的流
- 拆分值并在单独的流上处理每个部分
- 每个流都会转换数据,我无法控制应用的转换
- (重新)将部分值与其相应的计数器部分合并
我之所以要这样做,是为了保证值的完整性。或者至少是其中的一部分。
因为每个流都可能有一些异步操作,所以在加入流时它们不会按顺序出现。使用某种 concat()
也不起作用,因为它会阻止所有传入值。处理应该并行进行。
举例说明:
o
|
| [{a1,b1}, {a2,b2}, ...]
|
+
/ \
{a<x>} / \ {b<x>}
/ \
| |
| + async(b<x>) -> b'<x>
| |
\ /
\ /
\ /
\ /
+ join(a<x>, b'<x>)
|
| [{a1,b'1}, {a2,b'2}, ...]
|
(subscribe)
我知道这可以通过 result selector
函数来完成。例如
input$.mergeMap(
({a, b}) => Rx.Observable.of(b).let(async),
({a}, processedB) => ({a, b:processedB})
);
但是,(a) 这将导致 async
对于每个值总是 setup/tear 下降。我希望部分流只被初始化一次。此外,(b) 这仅适用于一个异步流。
我也尝试过使用 window*
,但不知道如何重新加入值。还尝试使用 goupBy
但没有成功。
编辑:
这是我目前的尝试。它有提到的问题 (a)。 Init...
和 Completed...
应该只记录一次。
const doSomethignAsync = data$ => {
console.log('Init...') // Should happen once.
return data$
.mergeMap(val => Rx.Observable.of(val.data).delay(val.delay))
.finally(() => console.log('Completed...')); // Should never happen
};
const input$ = new Rx.Subject();
const out$ = input$
.mergeMap(
({ a, b }) => Rx.Observable.of(b).let(doSomethignAsync),
({ a }, asyncResult ) => ({ a, b:asyncResult })
)
.subscribe(({a, b}) => {
if (a === b) {
console.log(`Re-joined [${a}, ${b}] correctly.`);
} else {
console.log(`Joined [${a}, ${b}]...`); // Should never happen
}
});
input$.next({ a: 1, b: { data: 1, delay: 2000 } });
input$.next({ a: 2, b: { data: 2, delay: 1000 } });
input$.next({ a: 3, b: { data: 3, delay: 3000 } });
input$.next({ a: 4, b: { data: 4, delay: 0 } });
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
这是一个要解决的相当复杂的问题,具体如何解决将取决于您的用例的非常具体的细节,这些细节没有包括在内。
就是说,这里是一种可能的方式,它做出了一堆假设。它有点通用,就像与 let
.
一起使用的自定义运算符
(旁注:我将其命名为 "collate",但这是一个糟糕且极具误导性的名称,但没时间命名...)
const collate = (...segments) => source$ =>
source$
.mergeMap((obj, index) => {
return segments.map(({ key, work }) => {
const input = obj[key];
const output$ = work(input);
return Observable.from(output$).map(output => ({
index,
result: { [key]: output }
}))
})
})
.mergeAll()
.groupBy(
obj => obj.index,
obj => obj.result,
group$ => group$.skip(segments.length - 1)
)
.mergeMap(group$ =>
group$.reduce(
(obj, result) => Object.assign(obj, result),
{}
)
);
这是一个用法示例:
const result$ = input$.let(
collate({
key: 'a',
work: a => {
// do stuff with "a"
return Observable.of(a).map(d => d + '-processed-A');
}
}, {
key: 'b',
work: b => {
// do stuff with "b"
return Observable.of(b).map(d => d + '-processed-B');
}
})
);
给定输入 { a: '1', b: '1 }
,它会输出 { a: '1-processed-A', b: '1-processed-B' }
等等,正确分组,同时尽可能多地同时进行——它所做的唯一缓冲是将所有段匹配在一起以获得特定的输入。
这是一个 运行 演示 https://jsbin.com/yuruvar/edit?js,output
细分
可能有更多 clear/simpler 方法可以做到这一点,特别是如果您可以对某些内容进行硬编码而不是使它们通用。但让我们分解一下我所做的。
const collate = (...segments) => source$ =>
source$
// for every input obj we use the index as an ID
// (which is provided by Rx as autoincrementing)
.mergeMap((obj, index) => {
// segments is the configuration of how we should
// chunk our data into concurrent processing channels.
// So we're returning an array, which mergeMap will consume
// as if it were an Observable, or we could have used
// Observable.from(arr) to be even more clear
return segments.map(({ key, work }) => {
const input = obj[key];
// the `work` function is expected to return
// something Observable-like
const output$ = work(input);
return Observable.from(output$).map(output => ({
// Placing the index we closed over lets us later
// stitch each segment back to together
index,
result: { [key]: output }
}))
})
})
// I had returned Array<Observable> in mergeMap
// so we need to flatten one more level. This is
// rather confusing...prolly clearer ways but #YOLO
.mergeAll()
// now we have a stream of all results for each segment
// in no guaranteed order so we need to group them together
.groupBy(
obj => obj.index,
obj => obj.result,
// this is tough to explain. this is used as a notifier
// to say when to complete() the group$, we want complete() it
// after we've received every segment for that group, so in the
// notifier we skip all except the last one we expect
// but remember this doesn't skip the elements downstream!
// only as part of the durationSelector notifier
group$ => group$.skip(segments.length - 1)
)
.mergeMap(group$ =>
// merge every segment object that comes back into one object
// so it has the same shape as it came in, but which the results
group$.reduce(
(obj, result) => Object.assign(obj, result),
{}
)
);
我没有考虑或担心错误 handling/propagation 可能如何工作,因为这在很大程度上取决于您的用例。如果您无法控制每个段的处理,那么还建议包括某种超时和 .take(1)
,否则您可能会泄漏订阅。
我想创建一个
的流- 拆分值并在单独的流上处理每个部分
- 每个流都会转换数据,我无法控制应用的转换
- (重新)将部分值与其相应的计数器部分合并
我之所以要这样做,是为了保证值的完整性。或者至少是其中的一部分。
因为每个流都可能有一些异步操作,所以在加入流时它们不会按顺序出现。使用某种 concat()
也不起作用,因为它会阻止所有传入值。处理应该并行进行。
举例说明:
o
|
| [{a1,b1}, {a2,b2}, ...]
|
+
/ \
{a<x>} / \ {b<x>}
/ \
| |
| + async(b<x>) -> b'<x>
| |
\ /
\ /
\ /
\ /
+ join(a<x>, b'<x>)
|
| [{a1,b'1}, {a2,b'2}, ...]
|
(subscribe)
我知道这可以通过 result selector
函数来完成。例如
input$.mergeMap(
({a, b}) => Rx.Observable.of(b).let(async),
({a}, processedB) => ({a, b:processedB})
);
但是,(a) 这将导致 async
对于每个值总是 setup/tear 下降。我希望部分流只被初始化一次。此外,(b) 这仅适用于一个异步流。
我也尝试过使用 window*
,但不知道如何重新加入值。还尝试使用 goupBy
但没有成功。
编辑:
这是我目前的尝试。它有提到的问题 (a)。 Init...
和 Completed...
应该只记录一次。
const doSomethignAsync = data$ => {
console.log('Init...') // Should happen once.
return data$
.mergeMap(val => Rx.Observable.of(val.data).delay(val.delay))
.finally(() => console.log('Completed...')); // Should never happen
};
const input$ = new Rx.Subject();
const out$ = input$
.mergeMap(
({ a, b }) => Rx.Observable.of(b).let(doSomethignAsync),
({ a }, asyncResult ) => ({ a, b:asyncResult })
)
.subscribe(({a, b}) => {
if (a === b) {
console.log(`Re-joined [${a}, ${b}] correctly.`);
} else {
console.log(`Joined [${a}, ${b}]...`); // Should never happen
}
});
input$.next({ a: 1, b: { data: 1, delay: 2000 } });
input$.next({ a: 2, b: { data: 2, delay: 1000 } });
input$.next({ a: 3, b: { data: 3, delay: 3000 } });
input$.next({ a: 4, b: { data: 4, delay: 0 } });
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
这是一个要解决的相当复杂的问题,具体如何解决将取决于您的用例的非常具体的细节,这些细节没有包括在内。
就是说,这里是一种可能的方式,它做出了一堆假设。它有点通用,就像与 let
.
(旁注:我将其命名为 "collate",但这是一个糟糕且极具误导性的名称,但没时间命名...)
const collate = (...segments) => source$ =>
source$
.mergeMap((obj, index) => {
return segments.map(({ key, work }) => {
const input = obj[key];
const output$ = work(input);
return Observable.from(output$).map(output => ({
index,
result: { [key]: output }
}))
})
})
.mergeAll()
.groupBy(
obj => obj.index,
obj => obj.result,
group$ => group$.skip(segments.length - 1)
)
.mergeMap(group$ =>
group$.reduce(
(obj, result) => Object.assign(obj, result),
{}
)
);
这是一个用法示例:
const result$ = input$.let(
collate({
key: 'a',
work: a => {
// do stuff with "a"
return Observable.of(a).map(d => d + '-processed-A');
}
}, {
key: 'b',
work: b => {
// do stuff with "b"
return Observable.of(b).map(d => d + '-processed-B');
}
})
);
给定输入 { a: '1', b: '1 }
,它会输出 { a: '1-processed-A', b: '1-processed-B' }
等等,正确分组,同时尽可能多地同时进行——它所做的唯一缓冲是将所有段匹配在一起以获得特定的输入。
这是一个 运行 演示 https://jsbin.com/yuruvar/edit?js,output
细分
可能有更多 clear/simpler 方法可以做到这一点,特别是如果您可以对某些内容进行硬编码而不是使它们通用。但让我们分解一下我所做的。
const collate = (...segments) => source$ =>
source$
// for every input obj we use the index as an ID
// (which is provided by Rx as autoincrementing)
.mergeMap((obj, index) => {
// segments is the configuration of how we should
// chunk our data into concurrent processing channels.
// So we're returning an array, which mergeMap will consume
// as if it were an Observable, or we could have used
// Observable.from(arr) to be even more clear
return segments.map(({ key, work }) => {
const input = obj[key];
// the `work` function is expected to return
// something Observable-like
const output$ = work(input);
return Observable.from(output$).map(output => ({
// Placing the index we closed over lets us later
// stitch each segment back to together
index,
result: { [key]: output }
}))
})
})
// I had returned Array<Observable> in mergeMap
// so we need to flatten one more level. This is
// rather confusing...prolly clearer ways but #YOLO
.mergeAll()
// now we have a stream of all results for each segment
// in no guaranteed order so we need to group them together
.groupBy(
obj => obj.index,
obj => obj.result,
// this is tough to explain. this is used as a notifier
// to say when to complete() the group$, we want complete() it
// after we've received every segment for that group, so in the
// notifier we skip all except the last one we expect
// but remember this doesn't skip the elements downstream!
// only as part of the durationSelector notifier
group$ => group$.skip(segments.length - 1)
)
.mergeMap(group$ =>
// merge every segment object that comes back into one object
// so it has the same shape as it came in, but which the results
group$.reduce(
(obj, result) => Object.assign(obj, result),
{}
)
);
我没有考虑或担心错误 handling/propagation 可能如何工作,因为这在很大程度上取决于您的用例。如果您无法控制每个段的处理,那么还建议包括某种超时和 .take(1)
,否则您可能会泄漏订阅。