Rx.NET - 设置容量并删除最早的

Rx.NET - Set capacity and drop oldest

System.Threading.Channels 允许我们指定容量和完整模式 = DropOldest。基本上当通道已满并且正在处理一条消息 10 秒时,在这 10 秒内它会丢弃新记录。

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
    FullMode = BoundedChannelFullMode.DropOldest
});

有没有办法用 Rx 做到这一点?

Rx observables 没有 属性 的“满”。可观察序列不是消息的存储,就像 Queue<T>Channel<T> 是。这只是 generator/propagator 条消息。一些 Rx 运算符有内部队列来执行它们的工作,例如 ConcatZip 运算符。通常这些队列是隐藏的,不能配置为“有损”。

可能具有您正在寻找的功能的 Rx 组件是 ReplaySubject<T>。该组件可以配置它可以重播的最大消息数 (int bufferSize),以及它可以在丢弃每条消息之前存储的最大持续时间 (TimeSpan window)。如果你设置了bufferSize而不是window,那么ReplaySubject<T>最终会缓冲指定数量的项目,然后缓冲区将永远保持相同的大小。每条传入消息都会导致最旧的缓冲消息被丢弃。 ReplaySubject<T> 不像 Channel<T> 那样是可消耗队列。它随时准备将其缓冲区中的所有消息传播给将来可能出现的任何新订阅者。

ReplaySubject<T>Replay 运算符用作传播器,类似于 Publish 运算符在内部由 Subject<T>.

支持的方式

添加到@Theodor Zoulias 的回答中,通道可能会变得“满”,因为您在一端写入它们并从另一端读取。它们是独立的操作,缓冲区是一个已定义的事物,具有您可以控制的预期行为。

Observables 应该立即传播任何通知——通常没有任何保留区域。一些 Rx 运算符添加一个作为其处理的一部分,但是如果正在读取的内容和正在写入的内容之间的这种分离级别对您很重要,那么 Channels 抽象可能比 Observables 更接近您的需求。

您所描述的听起来与其他 Rx 实现(如 RxJava)所称的“back pressure”相似。

这在 Rx.Net 中尚未实现,而且可能永远不会实现。