如何使用 IObservable<int> 将 IObservable<T> 分割成不同长度的 IObservable<T[]>
How to use an IObservable<int> to segment an IObservable<T> into an IObservable<T[]> of varying length
我有一个 "values" IObservable<T>
返回 T
必须按顺序组合成可变长度数组的元素,我有一个 "control" IObservable<int>
告诉我下一个数组必须有多长。
删除一个元素,重复它,或者把结果打乱都会使结果变得毫无意义。
这是我在 Rx.NET 中重写的串行连接机器人项目。
IObservable<char> values = new [] {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H' }.ToObservable();
IObservable<int> control = new [] { 1, 4, 2 }.ToObservable();
IObservable<char[]> result = control.SelectMany(length => values.Take(length).ToArray());
我希望看到这样的内容:
values ----A--B--C--D--E--F--G--H-->
control --1-----4---------------2--->
result ---[A]---------[BCDE]--[FG]->
但我目前的尝试结果是
-[A]-[AB]-[ABCD]->
您可以根据 prepare/create new observables 创建多个 helper 来构建您想要的新 observable。您可以为这些类型的可观察对象构建主题:
- 创建一个新的可观察对象,它重复一个唯一值,计数等于从
control
中读取的数字。从(1, 4, 2)
你会得到(guid_1, guid_2, guid_2, guid_2, guid_2, guid_3, guid_3)
。调用这个可观察的repeatSize
. - 使用
Zip()
运算符将repeatSize
和values
中的值各合并一个。您将获得一个具有以下值的可观察值:((A,guid_1), (B,guid_2), (C,guid_2), (D,guid_2), (E,guid_2), (F,guid_3), (G,guid_3))
。调用这个可观察的zippedValues
. - 订阅
zippedValues
和 put/add 列表中的原始值。同时从repeatSize
observable 中保存之前的值。将其与repeatSize
的当前值进行比较。当它被更改时(比如从guid_2
到guid_3
),你知道你已经到达 end/start,所以你可以将填充的列表发送到一个新的可观察对象。之后您再次重置列表并再次开始填充它。
您可能需要构建 2-3 个 Subject<T>
对象,订阅它们并使用多个 OnNext()
调用从其他可观察对象的订阅中填充它们。
我没有对此进行测试,但我相信您可以结合使用 Zip()
和 Scan()
来生成您要求的输出。
IObservable<char> values = new [] {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H' }.ToObservable();
IObservable<int> control = new [] { 1, 4, 2 }.ToObservable();
IObservable<char[]> result = control.Zip(
control.Scan(values, (rest, length) => rest.Skip(length)),
(length, vals) => vals.Take(length).ToArray()
);
好的,这是满足我所有需要的代码。 Progman,您的建议在使这项工作发挥作用方面发挥了重要作用。在这里,它被包裹在一个整洁的 Observable.Create
中,并在 IObservable<T>
上变成了一个扩展方法,带有一个一次性处理压缩序列上的订阅。
public static IObservable<T[]> Chop<T>(this IObservable<T> values, IObservable<int> control) =>
Observable.Create<T[]>(observer =>
{
List<T> buffer = new List<T>();
return values.Zip(control.SelectMany(length => Enumerable.Repeat(length, length)),
(value, length) => (value, length))
.Subscribe(next =>
{
buffer.Add(next.value);
if (buffer.Count == next.length)
{
observer.OnNext(buffer.ToArray());
buffer.Clear();
}
});
});
示例输出:
values ----A--B--C--D--E--F--G--H--I--J--K--L--M--N--O--P-->
control --1-4-2-0-3-3--------------------------------------->
result ---[A]---------[BCDE]-[FG]----[HIJ]----[KLM]-------->