通过 属性 和超时对 IObservable 进行分组
Grouping IObservable by both property and timeout
我需要将对象流转换为对象批次流,使用 Reactive Extensions:
按 属性 值对它们进行分组
class Record
{
public string Group;
public int Value;
}
IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
// ...
}
当发生以下任一情况时,批处理完成并发送到输出流:
- 一个新的对象来自源流,它的
Group
值与之前的值不同
- 源流中
N
秒没有新对象
例如,如果a1
表示new Record { Group = "a", Value = 1}
:
input: -a1-a2-a3-b1-b2-
output: -[a1, a2, a3]-[b1, b2]-
input: -a1-a2----------a3-
output: -[a1, a2]-------[a3]-
尝试了 GroupByUntil
、Debounce
、Buffer
和 Timer
的各种组合,但均无济于事。它是怎么做到的?
诀窍是将 GroupByUntil
与 Throttle
一起使用:
IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
return source.GroupByUntil(x => x.Group, g => g.Throttle(timeout))
.SelectMany(x => x.ToList());
}
我需要将对象流转换为对象批次流,使用 Reactive Extensions:
按 属性 值对它们进行分组class Record
{
public string Group;
public int Value;
}
IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
// ...
}
当发生以下任一情况时,批处理完成并发送到输出流:
- 一个新的对象来自源流,它的
Group
值与之前的值不同 - 源流中
N
秒没有新对象
例如,如果a1
表示new Record { Group = "a", Value = 1}
:
input: -a1-a2-a3-b1-b2-
output: -[a1, a2, a3]-[b1, b2]-
input: -a1-a2----------a3-
output: -[a1, a2]-------[a3]-
尝试了 GroupByUntil
、Debounce
、Buffer
和 Timer
的各种组合,但均无济于事。它是怎么做到的?
诀窍是将 GroupByUntil
与 Throttle
一起使用:
IObservable<List<Record>> Process(IObservable<Record> source, TimeSpan timeout)
{
return source.GroupByUntil(x => x.Group, g => g.Throttle(timeout))
.SelectMany(x => x.ToList());
}