Rx Extensions 刷新 Buffered observable 中的剩余项目

Rx Extensions flush remaining items in Buffered observable

所以我有这个代码:

ISubject<int> _processed = new ReplaySubject<int>();
_processed.Buffer(5000).Subscribe(UpdateProcessed);

// Start some process which calls _processed.OnNext

我遇到的问题是有时缓冲区没有填满,因为最后一批少于 5000 并且进程退出而没有执行对 UpdateProcessed 的调用。

有没有办法在处理完成后刷新 _processed observable 中剩余的项目?

OnComplete 将刷新缓冲区。

尝试_processed.OnComplete();

如果您不知道该过程何时完成,您可以随时添加一个时间跨度来缓冲。它会在时间跨度到期或缓冲区已满时发出项目,以先到者为准。

_processed.Buffer(TimeSpan.FromSeconds(1), 5000).Subscribe(UpdateProcessed);