Rx:一个类似 zip 的运算符,在其中一个流结束后继续?
Rx: a zip-like operator that continues after one of the streams ended?
我希望合并异步开始和结束的流(可观察对象):
-1----1----1----1---|->
-2----2--|->
[ optional_zip(sum) ]
-1----3----3----1---|->
我需要它做什么:将音频流添加在一起。它们是音频流 "chunks",但我将在这里用整数表示它们。所以第一个剪辑正在播放:
-1----1----1----1---|->
然后第二个开始,稍后:
-2----2--|->
求和的结果应该是:
-1----3----3----1---|->
但如果任何压缩流结束,标准压缩就会完成。我希望这个 optional_zip 即使其中一个流结束也能继续。在 Rx 中有什么方法可以做到这一点,还是我必须自己通过修改现有的 Zip 来实现它?
注意:我使用的是 RxPy,但这里的社区似乎很小,而且 Rx 运算符似乎在各种语言中都很通用,所以我也将其标记为 rx-java 和 rx-js。
所以我得到了一些代码,我认为它可以满足您的需求大部分。基本上,我创建了一个函数 zipAndContinue
,它将像 zip
一样运行,除了它会继续发射项目,只要一些底层流仍然有数据要发射。此功能仅用冷可观察量进行了 [简要] 测试。
此外,corrections/enhancements/edits 欢迎。
function zipAndContinue() {
// Augment each observable so it ends with null
const observables = Array.prototype.slice.call(arguments, 0).map(x => endWithNull(x));
const combined$ = Rx.Observable.combineLatest(observables);
// The first item from the combined stream is our first 'zipped' item
const first$ = combined$.first();
// We calculate subsequent 'zipped' item by only grabbing
// the items from the buffer that have all of the required updated
// items (remember, combineLatest emits each time any of the streams
// updates).
const subsequent$ = combined$
.skip(1)
.bufferWithCount(arguments.length)
.flatMap(zipped)
.filter(xs => !xs.every(x => x === null));
// We return the concatenation of these two streams
return first$.concat(subsequent$)
}
下面是使用的实用函数:
function endWithNull(observable) {
return Rx.Observable.create(observer => {
return observable.subscribe({
onNext: x => observer.onNext(x),
onError: x => observer.onError(x),
onCompleted: () => {
observer.onNext(null);
observer.onCompleted();
}
})
})
}
function zipped(xs) {
const nonNullCounts = xs.map(xs => xs.filter(x => x !== null).length);
// The number of streams that are still emitting
const stillEmitting = Math.max.apply(null, nonNullCounts);
if (stillEmitting === 0) {
return Rx.Observable.empty();
}
// Skip any intermittent results
return Rx.Observable.from(xs).skip(stillEmitting - 1);
}
下面是示例用法:
const one$ = Rx.Observable.from([1, 2, 3, 4, 5, 6]);
const two$ = Rx.Observable.from(['one']);
const three$ = Rx.Observable.from(['a', 'b']);
zipAndContinue(one$, two$, three$)
.subscribe(x => console.log(x));
// >> [ 1, 'one', 'a' ]
// >> [ 2, null, 'b' ]
// >> [ 3, null, null ]
// >> [ 4, null, null ]
// >> [ 5, null, null ]
// >> [ 6, null, null ]
还有一个js-fiddle(可以点击运行然后打开控制台):https://jsfiddle.net/ptx4g6wd/
我会把它分成两部分来解决这个问题。首先,我想要一些接受 Observable<Observable<T>>
并产生 Observable<Observable<T>[]>
的东西,其中数组仅包含 "active" (即不完整)可观察值。任何时候一个新元素被添加到外部 observable,并且任何时候一个内部 observable 完成,一个包含适当 observable 的新数组将被发射。这本质上是对主流的 "scan" 减少。
一旦你有了可以做到这一点的东西,你就可以使用 flatMapLatest 和 zip 来获得你想要的东西。
第一部分我的基本尝试如下:
function active(ss$) {
const activeStreams = new Rx.Subject();
const elements = [];
const subscriptions = [];
ss$.subscribe(s => {
var include = true;
const subscription = s.subscribe(x => {}, x => {}, x => {
include = false;
const i = elements.indexOf(s);
if (i > -1) {
elements.splice(i, 1);
activeStreams.onNext(elements.slice());
}
});
if (include) {
elements.push(s);
subscriptions.push(subscription);
activeStreams.onNext(elements.slice());
}
});
return Rx.Observable.using(
() => new Rx.Disposable(() => subscriptions.forEach(x => x.dispose())),
() => activeStreams
);
}
从那里开始,您只需将其压缩并压平,如下所示:
const zipped = active(c$).flatMapLatest(x =>
x.length === 0 ? Rx.Observable.never()
: x.length === 1 ? x[0]
: Rx.Observable.zip(x, (...args) => args.reduce((a, c) => a + c))
);
我假设零个活动流应该不产生任何东西,一个活动流应该产生它自己的元素,两个或多个流应该全部压缩在一起(所有这些都反映在地图应用程序中)。
我的(公认的相当有限的)测试有这种组合产生你想要的结果。
顺便提一下,问得好。我还没有看到任何解决问题第一部分的方法(尽管我绝不是 Rx 专家;如果有人知道已经这样做了,请 post 详细信息)。
我希望合并异步开始和结束的流(可观察对象):
-1----1----1----1---|->
-2----2--|->
[ optional_zip(sum) ]
-1----3----3----1---|->
我需要它做什么:将音频流添加在一起。它们是音频流 "chunks",但我将在这里用整数表示它们。所以第一个剪辑正在播放:
-1----1----1----1---|->
然后第二个开始,稍后:
-2----2--|->
求和的结果应该是:
-1----3----3----1---|->
但如果任何压缩流结束,标准压缩就会完成。我希望这个 optional_zip 即使其中一个流结束也能继续。在 Rx 中有什么方法可以做到这一点,还是我必须自己通过修改现有的 Zip 来实现它?
注意:我使用的是 RxPy,但这里的社区似乎很小,而且 Rx 运算符似乎在各种语言中都很通用,所以我也将其标记为 rx-java 和 rx-js。
所以我得到了一些代码,我认为它可以满足您的需求大部分。基本上,我创建了一个函数 zipAndContinue
,它将像 zip
一样运行,除了它会继续发射项目,只要一些底层流仍然有数据要发射。此功能仅用冷可观察量进行了 [简要] 测试。
此外,corrections/enhancements/edits 欢迎。
function zipAndContinue() {
// Augment each observable so it ends with null
const observables = Array.prototype.slice.call(arguments, 0).map(x => endWithNull(x));
const combined$ = Rx.Observable.combineLatest(observables);
// The first item from the combined stream is our first 'zipped' item
const first$ = combined$.first();
// We calculate subsequent 'zipped' item by only grabbing
// the items from the buffer that have all of the required updated
// items (remember, combineLatest emits each time any of the streams
// updates).
const subsequent$ = combined$
.skip(1)
.bufferWithCount(arguments.length)
.flatMap(zipped)
.filter(xs => !xs.every(x => x === null));
// We return the concatenation of these two streams
return first$.concat(subsequent$)
}
下面是使用的实用函数:
function endWithNull(observable) {
return Rx.Observable.create(observer => {
return observable.subscribe({
onNext: x => observer.onNext(x),
onError: x => observer.onError(x),
onCompleted: () => {
observer.onNext(null);
observer.onCompleted();
}
})
})
}
function zipped(xs) {
const nonNullCounts = xs.map(xs => xs.filter(x => x !== null).length);
// The number of streams that are still emitting
const stillEmitting = Math.max.apply(null, nonNullCounts);
if (stillEmitting === 0) {
return Rx.Observable.empty();
}
// Skip any intermittent results
return Rx.Observable.from(xs).skip(stillEmitting - 1);
}
下面是示例用法:
const one$ = Rx.Observable.from([1, 2, 3, 4, 5, 6]);
const two$ = Rx.Observable.from(['one']);
const three$ = Rx.Observable.from(['a', 'b']);
zipAndContinue(one$, two$, three$)
.subscribe(x => console.log(x));
// >> [ 1, 'one', 'a' ]
// >> [ 2, null, 'b' ]
// >> [ 3, null, null ]
// >> [ 4, null, null ]
// >> [ 5, null, null ]
// >> [ 6, null, null ]
还有一个js-fiddle(可以点击运行然后打开控制台):https://jsfiddle.net/ptx4g6wd/
我会把它分成两部分来解决这个问题。首先,我想要一些接受 Observable<Observable<T>>
并产生 Observable<Observable<T>[]>
的东西,其中数组仅包含 "active" (即不完整)可观察值。任何时候一个新元素被添加到外部 observable,并且任何时候一个内部 observable 完成,一个包含适当 observable 的新数组将被发射。这本质上是对主流的 "scan" 减少。
一旦你有了可以做到这一点的东西,你就可以使用 flatMapLatest 和 zip 来获得你想要的东西。
第一部分我的基本尝试如下:
function active(ss$) {
const activeStreams = new Rx.Subject();
const elements = [];
const subscriptions = [];
ss$.subscribe(s => {
var include = true;
const subscription = s.subscribe(x => {}, x => {}, x => {
include = false;
const i = elements.indexOf(s);
if (i > -1) {
elements.splice(i, 1);
activeStreams.onNext(elements.slice());
}
});
if (include) {
elements.push(s);
subscriptions.push(subscription);
activeStreams.onNext(elements.slice());
}
});
return Rx.Observable.using(
() => new Rx.Disposable(() => subscriptions.forEach(x => x.dispose())),
() => activeStreams
);
}
从那里开始,您只需将其压缩并压平,如下所示:
const zipped = active(c$).flatMapLatest(x =>
x.length === 0 ? Rx.Observable.never()
: x.length === 1 ? x[0]
: Rx.Observable.zip(x, (...args) => args.reduce((a, c) => a + c))
);
我假设零个活动流应该不产生任何东西,一个活动流应该产生它自己的元素,两个或多个流应该全部压缩在一起(所有这些都反映在地图应用程序中)。
我的(公认的相当有限的)测试有这种组合产生你想要的结果。
顺便提一下,问得好。我还没有看到任何解决问题第一部分的方法(尽管我绝不是 Rx 专家;如果有人知道已经这样做了,请 post 详细信息)。