使用 Reactive Extensions、嵌套订阅按组缓冲
Buffer group by groups with Reactive Extensions, nested subscribe
我有一个事件源可以生成属于特定组的事件。我想缓冲这些组并将这些组(分批)发送到存储。到目前为止我有这个:
eventSource
.GroupBy(event => event.GroupingKey)
.Select(group => new { group.Key, Events = group })
.Subscribe(group => group.Events
.Buffer(TimeSpan.FromSeconds(60), 100)
.Subscribe(list => SendToStorage(list)));
所以有一个嵌套订阅组中的事件。不知何故,我认为有更好的方法,但我还没有弄清楚。
这是一种方法
(from g in eventSource.GroupByUntil(e => e.GroupingKey,
g => g.Buffer(TimeSpan.FromSeconds(60), 100))
from b in g.ToList()
select b).Subscribe(SendToStorage);
解决方法如下:
eventSource
.GroupBy(e => e.GroupingKey)
.SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
.Subscribe(list => SendToStorage(list));
这里有一些可以帮助您的一般规则'reduce':
1) 嵌套订阅通常固定为 Select
嵌套订阅之前的所有内容,然后是 Merge
,然后是嵌套订阅。所以应用它,你会得到这个:
eventSource
.GroupBy(e => e.GroupingKey)
.Select(group => new { group.Key, Events = group })
.Select(group => group.Events.Buffer(TimeSpan.FromSeconds(60), 100)) //outer subscription selector
.Merge()
.Subscribe(list => SendToStorage(list));
2) 你显然可以组合两个连续的选择(并且因为你没有对匿名对象做任何事情,可以删除它):
eventSource
.GroupBy(e => e.GroupingKey)
.Select(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
.Merge()
.Subscribe(list => SendToStorage(list));
3) 最后,Select
后跟 Merge
可以简化为 SelectMany
:
eventSource
.GroupBy(e => e.GroupingKey)
.SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
.Subscribe(list => SendToStorage(list));
我有一个事件源可以生成属于特定组的事件。我想缓冲这些组并将这些组(分批)发送到存储。到目前为止我有这个:
eventSource
.GroupBy(event => event.GroupingKey)
.Select(group => new { group.Key, Events = group })
.Subscribe(group => group.Events
.Buffer(TimeSpan.FromSeconds(60), 100)
.Subscribe(list => SendToStorage(list)));
所以有一个嵌套订阅组中的事件。不知何故,我认为有更好的方法,但我还没有弄清楚。
这是一种方法
(from g in eventSource.GroupByUntil(e => e.GroupingKey,
g => g.Buffer(TimeSpan.FromSeconds(60), 100))
from b in g.ToList()
select b).Subscribe(SendToStorage);
解决方法如下:
eventSource
.GroupBy(e => e.GroupingKey)
.SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
.Subscribe(list => SendToStorage(list));
这里有一些可以帮助您的一般规则'reduce':
1) 嵌套订阅通常固定为 Select
嵌套订阅之前的所有内容,然后是 Merge
,然后是嵌套订阅。所以应用它,你会得到这个:
eventSource
.GroupBy(e => e.GroupingKey)
.Select(group => new { group.Key, Events = group })
.Select(group => group.Events.Buffer(TimeSpan.FromSeconds(60), 100)) //outer subscription selector
.Merge()
.Subscribe(list => SendToStorage(list));
2) 你显然可以组合两个连续的选择(并且因为你没有对匿名对象做任何事情,可以删除它):
eventSource
.GroupBy(e => e.GroupingKey)
.Select(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
.Merge()
.Subscribe(list => SendToStorage(list));
3) 最后,Select
后跟 Merge
可以简化为 SelectMany
:
eventSource
.GroupBy(e => e.GroupingKey)
.SelectMany(group => group.Buffer(TimeSpan.FromSeconds(60), 100))
.Subscribe(list => SendToStorage(list));