在 RX 中迭代从 IGroupedObservable 中选择的 IEnumerable
Iterate IEnumerable selected from IGroupedObservable in RX
我有一个 IObservable<T>
序列,其中 T
是一个 KeyValuePair<TKey, TValue>
,我使用 System.Reactive.Linq
中的 GroupBy
对其进行分组。
我想对每个 IGroupedObservable<TKey, KeyValuePair<TKey, TValue>>
执行聚合操作,但该聚合被定义为 Func<IEnumerable<TValue>, TValue>
。
例如,这里我想计算每个不同单词出现的次数并将其打印到控制台:
Func<IEnumerable<int>, int> aggregate = x => x.Count();
using (new[] { "one", "fish", "two", "fish" }
.Select(x => new KeyValuePair<string, int>(x, 1))
.ToObservable()
.GroupBy(x => x.Key)
.Select(x => new KeyValuePair<string, IEnumerable<int>>(
x.Key,
x.Select(y => y.Value).ToEnumerable()))
//.SubscribeOn(Scheduler.Default)
.Subscribe(x => Console.WriteLine($"{x.Key} [{aggregate(x.Value)}]")))
{
}
我希望输出与此类似(顺序不重要):
one [1]
fish [2]
two [1]
但是它要么阻塞(可能是死锁)要么根本不输出(当我取消注释 LINQ 语句的 SubscribeOn
子句时)。
我试图从实际使用场景中减少上面的代码,它试图link两个TPL数据流块但遇到类似的行为:
Func<IEnumerable<int>, int> aggregate = x => x.Sum();
var sourceBlock = new TransformBlock<string, KeyValuePair<string, int>>(x => new KeyValuePair<string, int>(x, 1));
var targetBlock = new ActionBlock<KeyValuePair<string, IEnumerable<int>>>(x => Console.WriteLine($"{x.Key} [{aggregate(x.Value)}]"));
using (sourceBlock.AsObservable()
.GroupBy(x => x.Key)
.Select(x => new KeyValuePair<string, IEnumerable<int>>(x.Key, x.Select(y => y.Value).ToEnumerable()))
.Subscribe(targetBlock.AsObserver()))
{
foreach (var kvp in new[] { "one", "fish", "two", "fish" })
{
sourceBlock.Post(kvp);
}
sourceBlock.Complete();
targetBlock.Completion.Wait();
}
我知道框架提供了 Sum
和 Count
在 IObservable<T>
上运行的方法,但我只能使用 IEnumerable<T>
聚合函数。
我是否误解了 ToEnumerable
,我该如何解决?
编辑:
IEnumerable<T>
的约束由我尝试 link 的两个数据流块的 target 引入,其签名不是我要更改的。
GroupBy
的工作原理是这样的:当新元素到达时,它提取一个键并查看之前是否已经观察到该键。如果不是 - 它会创建新组(新的可观察对象)并将键和可观察对象推送给您。关键点是 - 当您订阅 GroupBy
并且项目被推送到您的订阅时 - 序列是 not 尚未分组。推送的是组键和另一个可观察对象(IGroupedObservable
),该组中的元素将被推送到该对象。
您在代码中所做的实质上是订阅 GroupBy
,然后在 GroupBy
订阅中阻止尝试枚举 IGroupingObservable
。但是此时你不能枚举它,因为分组还没有完成。为了完成它 - GroupBy
应该处理整个序列,但它不能,因为它被阻止等待您的订阅处理程序完成。并且您的订阅处理程序等待 GroupBy
完成(阻止尝试枚举尚未就绪的序列)。因此你有一个僵局。
如果您尝试将 ObserveOn(Scheduler.Default)
引入 运行 您在线程池线程上的订阅处理程序 - 这将无济于事。它会消除死锁,但会引入竞争条件,你会丢失项目,因为你只在开始枚举 ToEnumerable
的结果时订阅了单个组。在这一点上可能为时已晚,在您订阅它(通过开始枚举)之前,一些和一些项目已经被推送到单个组可观察。这些项目不会重播,因此会丢失。
什么可以帮助它确实使用为 IObservable
提供的 Count()
,但出于某种原因你说你不能那样做。
在你使用数据流块的情况下,你可以尝试这样的事情:
sourceBlock.AsObservable()
.GroupBy(x => x.Key)
.Select(x => {
var res = new { x.Key, Value = x.Select(y => y.Value).Replay() };
// subscribe right here
// Replay will ensure that no items are missed
res.Value.Connect();
return res;
})
// observe on thread pool threads to not deadlock if necessary
// in the example with datablock in your question - it is not
//.ObserveOn(Scheduler.Default)
// now no deadlock and no missing items
.Select(x => new KeyValuePair<string, IEnumerable<int>>(x.Key, x.Value.ToEnumerable()))
.Subscribe(targetBlock.AsObserver())
我有一个 IObservable<T>
序列,其中 T
是一个 KeyValuePair<TKey, TValue>
,我使用 System.Reactive.Linq
中的 GroupBy
对其进行分组。
我想对每个 IGroupedObservable<TKey, KeyValuePair<TKey, TValue>>
执行聚合操作,但该聚合被定义为 Func<IEnumerable<TValue>, TValue>
。
例如,这里我想计算每个不同单词出现的次数并将其打印到控制台:
Func<IEnumerable<int>, int> aggregate = x => x.Count();
using (new[] { "one", "fish", "two", "fish" }
.Select(x => new KeyValuePair<string, int>(x, 1))
.ToObservable()
.GroupBy(x => x.Key)
.Select(x => new KeyValuePair<string, IEnumerable<int>>(
x.Key,
x.Select(y => y.Value).ToEnumerable()))
//.SubscribeOn(Scheduler.Default)
.Subscribe(x => Console.WriteLine($"{x.Key} [{aggregate(x.Value)}]")))
{
}
我希望输出与此类似(顺序不重要):
one [1]
fish [2]
two [1]
但是它要么阻塞(可能是死锁)要么根本不输出(当我取消注释 LINQ 语句的 SubscribeOn
子句时)。
我试图从实际使用场景中减少上面的代码,它试图link两个TPL数据流块但遇到类似的行为:
Func<IEnumerable<int>, int> aggregate = x => x.Sum();
var sourceBlock = new TransformBlock<string, KeyValuePair<string, int>>(x => new KeyValuePair<string, int>(x, 1));
var targetBlock = new ActionBlock<KeyValuePair<string, IEnumerable<int>>>(x => Console.WriteLine($"{x.Key} [{aggregate(x.Value)}]"));
using (sourceBlock.AsObservable()
.GroupBy(x => x.Key)
.Select(x => new KeyValuePair<string, IEnumerable<int>>(x.Key, x.Select(y => y.Value).ToEnumerable()))
.Subscribe(targetBlock.AsObserver()))
{
foreach (var kvp in new[] { "one", "fish", "two", "fish" })
{
sourceBlock.Post(kvp);
}
sourceBlock.Complete();
targetBlock.Completion.Wait();
}
我知道框架提供了 Sum
和 Count
在 IObservable<T>
上运行的方法,但我只能使用 IEnumerable<T>
聚合函数。
我是否误解了 ToEnumerable
,我该如何解决?
编辑:
IEnumerable<T>
的约束由我尝试 link 的两个数据流块的 target 引入,其签名不是我要更改的。
GroupBy
的工作原理是这样的:当新元素到达时,它提取一个键并查看之前是否已经观察到该键。如果不是 - 它会创建新组(新的可观察对象)并将键和可观察对象推送给您。关键点是 - 当您订阅 GroupBy
并且项目被推送到您的订阅时 - 序列是 not 尚未分组。推送的是组键和另一个可观察对象(IGroupedObservable
),该组中的元素将被推送到该对象。
您在代码中所做的实质上是订阅 GroupBy
,然后在 GroupBy
订阅中阻止尝试枚举 IGroupingObservable
。但是此时你不能枚举它,因为分组还没有完成。为了完成它 - GroupBy
应该处理整个序列,但它不能,因为它被阻止等待您的订阅处理程序完成。并且您的订阅处理程序等待 GroupBy
完成(阻止尝试枚举尚未就绪的序列)。因此你有一个僵局。
如果您尝试将 ObserveOn(Scheduler.Default)
引入 运行 您在线程池线程上的订阅处理程序 - 这将无济于事。它会消除死锁,但会引入竞争条件,你会丢失项目,因为你只在开始枚举 ToEnumerable
的结果时订阅了单个组。在这一点上可能为时已晚,在您订阅它(通过开始枚举)之前,一些和一些项目已经被推送到单个组可观察。这些项目不会重播,因此会丢失。
什么可以帮助它确实使用为 IObservable
提供的 Count()
,但出于某种原因你说你不能那样做。
在你使用数据流块的情况下,你可以尝试这样的事情:
sourceBlock.AsObservable()
.GroupBy(x => x.Key)
.Select(x => {
var res = new { x.Key, Value = x.Select(y => y.Value).Replay() };
// subscribe right here
// Replay will ensure that no items are missed
res.Value.Connect();
return res;
})
// observe on thread pool threads to not deadlock if necessary
// in the example with datablock in your question - it is not
//.ObserveOn(Scheduler.Default)
// now no deadlock and no missing items
.Select(x => new KeyValuePair<string, IEnumerable<int>>(x.Key, x.Value.ToEnumerable()))
.Subscribe(targetBlock.AsObserver())