如何使用 IObservable<int> 将 IObservable<T> 分割成不同长度的 IObservable<T[]>

How to use an IObservable<int> to segment an IObservable<T> into an IObservable<T[]> of varying length

我有一个 "values" IObservable<T> 返回 T 必须按顺序组合成可变长度数组的元素,我有一个 "control" IObservable<int> 告诉我下一个数组必须有多长。 删除一个元素,重复它,或者把结果打乱都会使结果变得毫无意义。

这是我在 Rx.NET 中重写的串行连接机器人项目。

IObservable<char> values = new [] {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H' }.ToObservable();
IObservable<int> control = new [] { 1, 4, 2 }.ToObservable();
IObservable<char[]> result = control.SelectMany(length => values.Take(length).ToArray());

我希望看到这样的内容:

values  ----A--B--C--D--E--F--G--H-->
control --1-----4---------------2--->
result  ---[A]---------[BCDE]--[FG]->

但我目前的尝试结果是

-[A]-[AB]-[ABCD]->

您可以根据 prepare/create new observables 创建多个 helper 来构建您想要的新 observable。您可以为这些类型的可观察对象构建主题:

  1. 创建一个新的可观察对象,它重复一个唯一值,计数等于从 control 中读取的数字。从 (1, 4, 2) 你会得到 (guid_1, guid_2, guid_2, guid_2, guid_2, guid_3, guid_3)。调用这个可观察的 repeatSize.
  2. 使用 Zip() 运算符将 repeatSizevalues 中的值各合并一个。您将获得一个具有以下值的可观察值:((A,guid_1), (B,guid_2), (C,guid_2), (D,guid_2), (E,guid_2), (F,guid_3), (G,guid_3))。调用这个可观察的 zippedValues.
  3. 订阅 zippedValues 和 put/add 列表中的原始值。同时从 repeatSize observable 中保存之前的值。将其与 repeatSize 的当前值进行比较。当它被更改时(比如从 guid_2guid_3),你知道你已经到达 end/start,所以你可以将填充的列表发送到一个新的可观察对象。之后您再次重置列表并再次开始填充它。

您可能需要构建 2-3 个 Subject<T> 对象,订阅它们并使用多个 OnNext() 调用从其他可观察对象的订阅中填充它们。

我没有对此进行测试,但我相信您可以结合使用 Zip()Scan() 来生成您要求的输出。

IObservable<char> values = new [] {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H' }.ToObservable();
IObservable<int> control = new [] { 1, 4, 2 }.ToObservable();
IObservable<char[]> result = control.Zip(
    control.Scan(values, (rest, length) => rest.Skip(length)),
    (length, vals) => vals.Take(length).ToArray()
);

好的,这是满足我所有需要的代码。 Progman,您的建议在使这项工作发挥作用方面发挥了重要作用。在这里,它被包裹在一个整洁的 Observable.Create 中,并在 IObservable<T> 上变成了一个扩展方法,带有一个一次性处理压缩序列上的订阅。

    public static IObservable<T[]> Chop<T>(this IObservable<T> values, IObservable<int> control) =>
        Observable.Create<T[]>(observer => 
        {
            List<T> buffer = new List<T>();
            return values.Zip(control.SelectMany(length => Enumerable.Repeat(length, length)), 
                              (value, length) => (value, length))
                         .Subscribe(next => 
                         {
                             buffer.Add(next.value);
                             if (buffer.Count == next.length)
                             {
                                 observer.OnNext(buffer.ToArray());
                                 buffer.Clear();
                             }
                         });
        });

示例输出:

values  ----A--B--C--D--E--F--G--H--I--J--K--L--M--N--O--P-->
control --1-4-2-0-3-3--------------------------------------->
result  ---[A]---------[BCDE]-[FG]----[HIJ]----[KLM]-------->