在 Rx 中观察缓冲和分组输入

Observing buffered and group input in in Rx

我有一个主题

  public ISubject<Price> PriceTicksSubject { get; }

这是缓冲和订阅如下

void DoSubscribe()
{
      PriceTicksSubject.Buffer(TimeSpan.FromMilliseconds(5000)).
                Select(buffer => buffer.GroupBy(tick => tick.Key, (key, res) => res.Last())).
                ObserveOn(NewThreadScheduler.Default).Subscribe(x=> 
                        SendtoClients(x));    

}

我想要的是能够在五秒内将所有经过缓冲和分组的项目作为一个可枚举的项目发送给客户。但是,上面的代码每五秒多次调用 SendToClients()(乘以键数)。

关于如何解决这个问题的任何指示?

将缓冲结果分组到 onNext 操作中:

void DoSubscribe()
{
    PriceTicksSubject.Buffer(TimeSpan.FromMilliseconds(5000))
        .ObserveOn(NewThreadScheduler.Default)
        .Subscribe(bufferedPrices =>
        {
            SendtoClients(bufferedPrices.GroupBy(x => x.Key).Select(g => g.Last()).ToArray());
        });
}