如何实现 RxJS flatMapLatestTwo
How to implement RxJS flatMapLatestTwo
RxJS 的 flatMapLatest 扁平化了最新的(只有一个)嵌套的 Observable。我有一个用例,我不想要 flatMap(将过去所有嵌套的 Observable 压平),我不想要 flatMapWithConcurrency(因为它有利于旧的 Observable,而不是最新的 Observable),所以我想要的是 flatMapLatestTwo 或flatMapLatest 的某些版本,您可以在其中指定并发嵌套 Observable 的最大数量,例如flatMapLatest(2, selectorFn)
.
这是我想要的输出(_X
指的是嵌套的 Observable X
而 eX
指的是它的第 X 个 onNext 事件):
_0e0
_0e1
_1e0
_0e2
_1e1
_2e0
_1e2
_2e1
_3e0
_2e2
_3e1
_4e0
_3e2
_4e1
_3e3
_4e2
_4e3
这是 flatMapLatest 产生的结果:
_0e0
_0e1
_1e0
_1e1
_2e0
_2e1
_3e0
_3e1
_4e0
_4e1
_4e2
_4e3
我更喜欢使用现有运算符的解决方案,而不是实现这个低级别的解决方案。
我的答案是在 C# 中。对不起。
您没有指定您的观测值是热的还是冷的。您的奇怪数字可能来自这样一个事实,即您的 windowing 对您的 'inner' observable 进行了新的订阅,因为它从 window 中的第一个推到了第二个。我的第一次尝试也是如此:
var q = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(i => Observable.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => $"_{i}e{x}"));
w = q.Zip(q.Skip(1), (prev, curr)=> prev.Merge(curr)).Switch();
我认为您将很难做任何事情来避免这种情况,除非您创建一个操作员,因为有状态参与管理它。 (显然有人会在这里证明我错了!!)
这是我的运算符方法,它也恰好支持您要求的参数化。
public static class Ex
{
public static IObservable<T> SelectManyLatest<T>(this IObservable<IObservable<T>> source, int latest)
{
return Observable.Create<T>(o =>
{
var d = new Queue<IDisposable>();
source.Subscribe(os =>
{
if(d.Count == latest)
d.Dequeue().Dispose();
d.Enqueue(os.Subscribe(o.OnNext, o.OnError, () => {}));
}, o.OnError, o.OnCompleted);
return Disposable.Create(()=>new CompositeDisposable(d).Dispose());
});
}
}
再次为 C# 感到抱歉
这看起来很天真。我正在寻找改进的方法,但它是:
Rx.Observable.prototype.flatMapLatestN = function (count, transform) {
let queue = [];
return this.flatMap(x => {
return Rx.Observable.create(observer => {
let disposable;
if (queue.length < count) {
disposable = transform(x).subscribe(observer);
queue.push(observer);
}
else {
let earliestObserver = queue[0];
if (earliestObserver) {
earliestObserver.onCompleted();
}
disposable = transform(x).subscribe(observer);
queue.push(observer);
}
return () => {
disposable.dispose();
let i = queue.indexOf(observer);
queue.splice(i, 1);
};
});
});
};
测试:
function space(n) {
return Array(n+1).join(' ');
}
Rx.Observable
.interval(1000)
.take(6)
.flatMapLatestN(2, (x) => {
return Rx.Observable
.interval(300)
.take(10)
.map(n => `${space(x*4)}${x}-${n}`);
})
.subscribe(console.log.bind(console));
它将输出:
0-1
0-2
0-3
1-0
0-4
1-1
0-5
1-2
1-3
2-0
1-4
2-1
1-5
2-2
2-3
3-0
2-4
3-1
2-5
3-2
3-3
4-0
3-4
4-1
3-5
4-2
4-3
5-0
4-4
5-1
4-5
5-2
4-6
5-3
4-7
5-4
4-8
5-5
4-9
5-6
5-7
5-8
5-9
这是一个使用内置运算符的解决方案。首先,我们将 Observable 拆分为 N 个 observable,其中每个 observable 都有相应的序列中的第 N 个最新项。然后我们 flatMapLatest
每个并合并它们。
Rx.Observable.prototype.flatMapLatestN = function(N, selector, thisArg) {
var self = this;
return Rx.Observable.merge(Rx.Observable.range(0, N).flatMap(function(n) {
return self.filter(function(x, i) {
return i % N === n;
}).flatMapLatest(selector, thisArg);
}));
}
或者在 ES2015 中:
Rx.Observable.prototype.flatMapLatestN = function(N, selector, thisArg) {
const {merge, range} = Rx.Observable;
return merge(
range(0, N)
.flatMap(n =>
this.filter((x, i) => i % N === n).flatMapLatest(selector, thisArg))
);
}
使用与戴维相同的测试:
N=1
的输出(与 flatMapLatest
相同):
0-0
0-1
0-2
1-0
1-1
1-2
2-0
2-1
2-2
3-0
3-1
3-2
4-0
4-1
4-2
5-0
5-1
5-2
5-3
5-4
5-5
5-6
5-7
5-8
5-9
N=2
的输出:
0-0
0-1
0-2
0-3
1-0
0-4
1-1
0-5
1-2
1-3
2-0
1-4
2-1
1-5
2-2
2-3
3-0
2-4
3-1
2-5
3-2
3-3
4-0
3-4
4-1
3-5
4-2
4-3
5-0
4-4
5-1
4-5
5-2
4-6
5-3
4-7
5-4
4-8
5-5
4-9
5-6
5-7
5-8
5-9
N=3
的输出:
0-0
0-1
0-2
0-3
1-0
0-4
1-1
0-5
1-2
0-6
1-3
2-0
0-7
1-4
2-1
0-8
1-5
2-2
1-6
2-3
3-0
1-7
2-4
3-1
1-8
2-5
3-2
2-6
3-3
4-0
2-7
3-4
4-1
2-8
3-5
4-2
3-6
4-3
5-0
3-7
4-4
5-1
3-8
4-5
5-2
3-9
4-6
5-3
4-7
5-4
4-8
5-5
4-9
5-6
5-7
5-8
5-9
RxJS 的 flatMapLatest 扁平化了最新的(只有一个)嵌套的 Observable。我有一个用例,我不想要 flatMap(将过去所有嵌套的 Observable 压平),我不想要 flatMapWithConcurrency(因为它有利于旧的 Observable,而不是最新的 Observable),所以我想要的是 flatMapLatestTwo 或flatMapLatest 的某些版本,您可以在其中指定并发嵌套 Observable 的最大数量,例如flatMapLatest(2, selectorFn)
.
这是我想要的输出(_X
指的是嵌套的 Observable X
而 eX
指的是它的第 X 个 onNext 事件):
_0e0
_0e1
_1e0
_0e2
_1e1
_2e0
_1e2
_2e1
_3e0
_2e2
_3e1
_4e0
_3e2
_4e1
_3e3
_4e2
_4e3
这是 flatMapLatest 产生的结果:
_0e0
_0e1
_1e0
_1e1
_2e0
_2e1
_3e0
_3e1
_4e0
_4e1
_4e2
_4e3
我更喜欢使用现有运算符的解决方案,而不是实现这个低级别的解决方案。
我的答案是在 C# 中。对不起。
您没有指定您的观测值是热的还是冷的。您的奇怪数字可能来自这样一个事实,即您的 windowing 对您的 'inner' observable 进行了新的订阅,因为它从 window 中的第一个推到了第二个。我的第一次尝试也是如此:
var q = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(i => Observable.Interval(TimeSpan.FromMilliseconds(100))
.Select(x => $"_{i}e{x}"));
w = q.Zip(q.Skip(1), (prev, curr)=> prev.Merge(curr)).Switch();
我认为您将很难做任何事情来避免这种情况,除非您创建一个操作员,因为有状态参与管理它。 (显然有人会在这里证明我错了!!)
这是我的运算符方法,它也恰好支持您要求的参数化。
public static class Ex
{
public static IObservable<T> SelectManyLatest<T>(this IObservable<IObservable<T>> source, int latest)
{
return Observable.Create<T>(o =>
{
var d = new Queue<IDisposable>();
source.Subscribe(os =>
{
if(d.Count == latest)
d.Dequeue().Dispose();
d.Enqueue(os.Subscribe(o.OnNext, o.OnError, () => {}));
}, o.OnError, o.OnCompleted);
return Disposable.Create(()=>new CompositeDisposable(d).Dispose());
});
}
}
再次为 C# 感到抱歉
这看起来很天真。我正在寻找改进的方法,但它是:
Rx.Observable.prototype.flatMapLatestN = function (count, transform) {
let queue = [];
return this.flatMap(x => {
return Rx.Observable.create(observer => {
let disposable;
if (queue.length < count) {
disposable = transform(x).subscribe(observer);
queue.push(observer);
}
else {
let earliestObserver = queue[0];
if (earliestObserver) {
earliestObserver.onCompleted();
}
disposable = transform(x).subscribe(observer);
queue.push(observer);
}
return () => {
disposable.dispose();
let i = queue.indexOf(observer);
queue.splice(i, 1);
};
});
});
};
测试:
function space(n) {
return Array(n+1).join(' ');
}
Rx.Observable
.interval(1000)
.take(6)
.flatMapLatestN(2, (x) => {
return Rx.Observable
.interval(300)
.take(10)
.map(n => `${space(x*4)}${x}-${n}`);
})
.subscribe(console.log.bind(console));
它将输出:
0-1
0-2
0-3
1-0
0-4
1-1
0-5
1-2
1-3
2-0
1-4
2-1
1-5
2-2
2-3
3-0
2-4
3-1
2-5
3-2
3-3
4-0
3-4
4-1
3-5
4-2
4-3
5-0
4-4
5-1
4-5
5-2
4-6
5-3
4-7
5-4
4-8
5-5
4-9
5-6
5-7
5-8
5-9
这是一个使用内置运算符的解决方案。首先,我们将 Observable 拆分为 N 个 observable,其中每个 observable 都有相应的序列中的第 N 个最新项。然后我们 flatMapLatest
每个并合并它们。
Rx.Observable.prototype.flatMapLatestN = function(N, selector, thisArg) {
var self = this;
return Rx.Observable.merge(Rx.Observable.range(0, N).flatMap(function(n) {
return self.filter(function(x, i) {
return i % N === n;
}).flatMapLatest(selector, thisArg);
}));
}
或者在 ES2015 中:
Rx.Observable.prototype.flatMapLatestN = function(N, selector, thisArg) {
const {merge, range} = Rx.Observable;
return merge(
range(0, N)
.flatMap(n =>
this.filter((x, i) => i % N === n).flatMapLatest(selector, thisArg))
);
}
使用与戴维相同的测试:
N=1
的输出(与 flatMapLatest
相同):
0-0
0-1
0-2
1-0
1-1
1-2
2-0
2-1
2-2
3-0
3-1
3-2
4-0
4-1
4-2
5-0
5-1
5-2
5-3
5-4
5-5
5-6
5-7
5-8
5-9
N=2
的输出:
0-0
0-1
0-2
0-3
1-0
0-4
1-1
0-5
1-2
1-3
2-0
1-4
2-1
1-5
2-2
2-3
3-0
2-4
3-1
2-5
3-2
3-3
4-0
3-4
4-1
3-5
4-2
4-3
5-0
4-4
5-1
4-5
5-2
4-6
5-3
4-7
5-4
4-8
5-5
4-9
5-6
5-7
5-8
5-9
N=3
的输出:
0-0
0-1
0-2
0-3
1-0
0-4
1-1
0-5
1-2
0-6
1-3
2-0
0-7
1-4
2-1
0-8
1-5
2-2
1-6
2-3
3-0
1-7
2-4
3-1
1-8
2-5
3-2
2-6
3-3
4-0
2-7
3-4
4-1
2-8
3-5
4-2
3-6
4-3
5-0
3-7
4-4
5-1
3-8
4-5
5-2
3-9
4-6
5-3
4-7
5-4
4-8
5-5
4-9
5-6
5-7
5-8
5-9