如何将 IGroupedObservable 转换为 IGrouping?

How to convert an IGroupedObservable to IGrouping?

我有一个可观察的元素序列,具有 char Key 属性,其值在 'A''E' 范围内。我想根据这个键对这些元素进行分组。对它们进行分组后,我希望通过一组可观察的结果得到结果,以便我可以分别处理每个组。我的问题是我找不到一个很好的方法来在最终的可观察对象中保留每个组的密钥。这是我正在尝试做的一个例子:

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(100))
    .Take(42)
    .GroupBy(n => (char)(65 + n % 5))
    .Select(grouped => grouped.ToArray())
    .Merge();

observable.Subscribe(group =>
    Console.WriteLine($"Group: {String.Join(", ", group)}"));

输出:

Group: 0, 5, 10, 15, 20, 25, 30, 35, 40
Group: 1, 6, 11, 16, 21, 26, 31, 36, 41
Group: 2, 7, 12, 17, 22, 27, 32, 37
Group: 3, 8, 13, 18, 23, 28, 33, 38
Group: 4, 9, 14, 19, 24, 29, 34, 39

组是正确的,但密钥 ('A' - 'E') 丢失了。 observable 的类型是 IObservable<long[]>。我想要什么 相反,是 IObservable<IGrouping<char, long>>。这样 group.Key 将在最终订阅代码中可用。但据我所知 没有内置的方法来转换 IGroupedObservable (the result of the GroupBy operator) to an IGrouping。我可以看到运算符 ToArrayToListToLookupToDictionary 等,但不是 ToGrouping 运算符。我的问题是,如何实现这个运算符?

这是我不完整的实现尝试:

public static IObservable<IGrouping<TKey, TSource>> ToGrouping<TKey, TSource>(
    this IGroupedObservable<TKey, TSource> source)
{
    return Observable.Create<IGrouping<TKey, TSource>>(observer =>
    {
        // What to do?
        return source.Subscribe();
    });
}

我的意图是在原始示例中使用它而不是 ToArray,像这样:

.Select(grouped => grouped.ToGrouping())

这可以满足您的大部分需求:

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(100))
    .Take(42)
    .GroupBy(n => (char)(65 + n % 5))
    .SelectMany(grouped => grouped.ToArray().Select(a => (key: grouped.Key, results: a)));

那是 IObservable<ValueTuple<TKey, TResult[]>。如果你想要 IGrouping 界面,你必须制作一个对象,因为我认为没有适合你的对象:

public static class Grouping
{
    // Because I'm too lazy to code types
    public static Grouping<TKey, TResult> Create<TKey, TResult>(TKey key, IEnumerable<TResult> results)
    {
        return new Grouping<TKey, TResult>(key, results);
    }
}

public class Grouping<TKey, TResult> : IGrouping<TKey, TResult>
{
    public Grouping(TKey key, IEnumerable<TResult> results)
    {
        this.Key = key;
        this.Results = results;
    }
    
    public TKey Key { get; }
    public IEnumerable<TResult> Results { get; }

    public IEnumerator<TResult> GetEnumerator()
    {
        return Results.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return Results.GetEnumerator();
    }
}

然后您的可观察值变为:

var o2 = Observable.Interval(TimeSpan.FromMilliseconds(100))
    .Take(42)
    .GroupBy(n => (char)(65 + n % 5))
    .SelectMany(grouped => grouped.ToArray().Select(a => Grouping.Create(grouped.Key, a)));

这似乎是你想要的:

IObservable<(char Key, long[] Values)> observable =
    Observable
        .Interval(TimeSpan.FromMilliseconds(100))
        .Take(42)
        .GroupBy(n => (char)(65 + n % 5))
        .Select(grouped => new { Key = grouped.Key, Values = grouped.ToArray() })
        .SelectMany(x => x.Values, (k, v) => (Key: k.Key, Values: v));

observable.Subscribe(group =>
    Console.WriteLine($"Group {group.Key}: {String.Join(", ", group.Values)}"));

我得到:

Group A: 0, 5, 10, 15, 20, 25, 30, 35, 40
Group B: 1, 6, 11, 16, 21, 26, 31, 36, 41
Group C: 2, 7, 12, 17, 22, 27, 32, 37
Group D: 3, 8, 13, 18, 23, 28, 33, 38
Group E: 4, 9, 14, 19, 24, 29, 34, 39

我找到了一种实现 ToGrouping 运算符的方法,而无需创建实现 IGrouping 接口的自定义 class。它比 Shlomo 的 .

更简洁但效率更低
/// <summary>
/// Creates an observable sequence containing a single 'IGrouping' that has the same
/// key with the source 'IGroupedObservable', and contains all of its elements.
/// </summary>
public static IObservable<IGrouping<TKey, TSource>> ToGrouping<TKey, TSource>(
    this IGroupedObservable<TKey, TSource> source)
{
    return source
        .ToList()
        .Select(list => list.GroupBy(_ => source.Key).Single());
}

此实现假定 TKey 类型未以某种疯狂的方式实现 IEquatable 接口,即 returns 相同值的不同哈希码,或认为值不相等对自己。如果发生这种情况,Single LINQ 运算符将抛出异常。