在 RX 中迭代从 IGroupedObservable 中选择的 IEnumerable

Iterate IEnumerable selected from IGroupedObservable in RX

我有一个 IObservable<T> 序列,其中 T 是一个 KeyValuePair<TKey, TValue>,我使用 System.Reactive.Linq 中的 GroupBy 对其进行分组。

我想对每个 IGroupedObservable<TKey, KeyValuePair<TKey, TValue>> 执行聚合操作,但该聚合被定义为 Func<IEnumerable<TValue>, TValue>

例如,这里我想计算每个不同单词出现的次数并将其打印到控制台:

Func<IEnumerable<int>, int> aggregate = x => x.Count();

using (new[] { "one", "fish", "two", "fish" }
    .Select(x => new KeyValuePair<string, int>(x, 1))
    .ToObservable()
    .GroupBy(x => x.Key)
    .Select(x => new KeyValuePair<string, IEnumerable<int>>(
                x.Key,
                x.Select(y => y.Value).ToEnumerable()))
    //.SubscribeOn(Scheduler.Default)
    .Subscribe(x => Console.WriteLine($"{x.Key} [{aggregate(x.Value)}]")))
{
}

我希望输出与此类似(顺序不重要):

one [1]
fish [2]
two [1]

但是它要么阻塞(可能是死锁)要么根本不输出(当我取消注释 LINQ 语句的 SubscribeOn 子句时)。

我试图从实际使用场景中减少上面的代码,它试图link两个TPL数据流块但遇到类似的行为:

Func<IEnumerable<int>, int> aggregate = x => x.Sum();

var sourceBlock = new TransformBlock<string, KeyValuePair<string, int>>(x => new KeyValuePair<string, int>(x, 1));
var targetBlock = new ActionBlock<KeyValuePair<string, IEnumerable<int>>>(x => Console.WriteLine($"{x.Key} [{aggregate(x.Value)}]"));
using (sourceBlock.AsObservable()
    .GroupBy(x => x.Key)
    .Select(x => new KeyValuePair<string, IEnumerable<int>>(x.Key, x.Select(y => y.Value).ToEnumerable()))
    .Subscribe(targetBlock.AsObserver()))
{
    foreach (var kvp in new[] { "one", "fish", "two", "fish" })
    {
        sourceBlock.Post(kvp);
    }
    sourceBlock.Complete();
    targetBlock.Completion.Wait();
}

我知道框架提供了 SumCountIObservable<T> 上运行的方法,但我只能使用 IEnumerable<T> 聚合函数。

我是否误解了 ToEnumerable,我该如何解决?

编辑: IEnumerable<T> 的约束由我尝试 link 的两个数据流块的 target 引入,其签名不是我要更改的。

GroupBy 的工作原理是这样的:当新元素到达时,它提取一个键并查看之前是否已经观察到该键。如果不是 - 它会创建新组(新的可观察对象)并将键和可观察对象推送给您。关键点是 - 当您订阅 GroupBy 并且项目被推送到您的订阅时 - 序列是 not 尚未分组。推送的是组键和另一个可观察对象(IGroupedObservable),该组中的元素将被推送到该对象。

您在代码中所做的实质上是订阅 GroupBy,然后在 GroupBy 订阅中阻止尝试枚举 IGroupingObservable。但是此时你不能枚举它,因为分组还没有完成。为了完成它 - GroupBy 应该处理整个序列,但它不能,因为它被阻止等待您的订阅处理程序完成。并且您的订阅处理程序等待 GroupBy 完成(阻止尝试枚举尚未就绪的序列)。因此你有一个僵局。

如果您尝试将 ObserveOn(Scheduler.Default) 引入 运行 您在线程池线程上的订阅处理程序 - 这将无济于事。它会消除死锁,但会引入竞争条件,你会丢失项目,因为你只在开始枚举 ToEnumerable 的结果时订阅了单个组。在这一点上可能为时已晚,在您订阅它(通过开始枚举)之前,一些和一些项目已经被推送到单个组可观察。这些项目不会重播,因此会丢失。

什么可以帮助它确实使用为 IObservable 提供的 Count(),但出于某种原因你说你不能那样做。

在你使用数据流块的情况下,你可以尝试这样的事情:

sourceBlock.AsObservable()
    .GroupBy(x => x.Key)
    .Select(x => {
        var res = new { x.Key, Value = x.Select(y => y.Value).Replay() };
        // subscribe right here
        // Replay will ensure that no items are missed
        res.Value.Connect();
        return res;
    })                
    // observe on thread pool threads to not deadlock if necessary
    // in the example with datablock in your question - it is not
    //.ObserveOn(Scheduler.Default)
    // now no deadlock and no missing items
    .Select(x => new KeyValuePair<string, IEnumerable<int>>(x.Key, x.Value.ToEnumerable()))
    .Subscribe(targetBlock.AsObserver())