通过 属性 和超时对 IObservable 进行分组

Grouping IObservable by both property and timeout

我需要将对象流转换为对象批次流,使用 Reactive Extensions:

按 属性 值对它们进行分组
class Record
{
    public string Group;
    public int Value;
}

IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
    // ...
}

当发生以下任一情况时,批处理完成并发送到输出流:

例如,如果a1表示new Record { Group = "a", Value = 1}

input:   -a1-a2-a3-b1-b2-
output:  -[a1, a2, a3]-[b1, b2]-

input:   -a1-a2----------a3-
output:  -[a1, a2]-------[a3]-

尝试了 GroupByUntilDebounceBufferTimer 的各种组合,但均无济于事。它是怎么做到的?

诀窍是将 GroupByUntilThrottle 一起使用:

IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
    return source.GroupByUntil(x => x.Group, g => g.Throttle(timeout))
                 .SelectMany(x => x.ToList());
}