将最后一项推送到 Observable(序列)

Take the last item pushed to an Observable (Sequence)

我在 class 中有一个 IObservable<Item>,我想公开一个只读的 属性,它提供在给定时间推送到可观察对象的最后一个项目。所以它将提供单个值Item

如果没有值被推送,那么它必须return一个默认值。

如何在不订阅 observable 和 "backing field" 的情况下做到这一点?

假设有一个热可观察对象。

对于observable = source.Replay(1); observable.Connect();

提供值:

public int Value => observable.Take(1).Amb(Observable.Return(defaultValue)).Wait();

这将 return 一个默认值,以防没有任何值被推送。

您想要从 Reactive 过渡到 state,因此支持字段不是一个糟糕的选择。你提到你不想订阅,但要观察任何东西:某事,某处必须订阅

只是在这里补充一下@Asti 的回答,也许可以帮助您解决挫败感:

Observable 不是物理的 'thing',它更像是一个逻辑概念。经常将 Rx 与 LINQ 进行比较,并且在很多时候这是一个公平的比较。但当您开始谈论数据结构时,它会崩溃:LINQ 的枚举与用于学习目的的列表非常相似。

但是,在 Rx 方面,根本没有与 List 等效的东西。可观察对象是一种瞬态数据结构,所有运算符都处理这种瞬态。如果您正在寻找永久状态,那么您将离开 Rx。

话虽如此,将可观察对象转换为某种状态是一个常见问题,并且有一些包可以帮助您:ReactiveUI 可能是最著名的。 ReactiveProperty 是另一个。这两个包都有缺陷,但可能对你有帮助。

如果您只是想寻找一种更简单的方法来获得支持字段,而无需 boiler-plating 支持字段,这将有效:

public static class ReactivePropertyExtensions
{
    public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source)
    {
        return new ReactiveProperty<T>(source);
    }

    public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source, T defaultValue)
    {
        return new ReactiveProperty<T>(source, defaultValue);
    }
}

public class ReactiveProperty<T> : IDisposable
{
    private IObservable<T> Source { get; }
    private IDisposable Subscription { get; }
    public T Value { get; private set; }

    public ReactiveProperty(IObservable<T> source)
        : this(source, default(T)) { }

    public ReactiveProperty(IObservable<T> source, T defaultValue)
    {
        Value = defaultValue;
        Source = source;
        Subscription = source.Subscribe(t => Value = t);
    }

    public void Dispose()
    {
        Subscription.Dispose();
    }
}

使用示例:

var ticker = Observable.Interval(TimeSpan.FromSeconds(1))
    .Publish().RefCount();

var latestTickerValue = ticker.ToReactiveProperty();
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(3));
Console.WriteLine(latestTickerValue.Value);

这是另一种定义 Value 属性 的方式,本着 Asti 的 .

的精神
private readonly IObservable<Item> _source;
private readonly IObservable<Item> _lastValue;

public SomeClass() // Constructor
{
    _source = /* Initialize the source observable (hot) */

    _lastValue = _source
        .Catch(Observable.Never<Item>())
        .Concat(Observable.Never<Item>())
        .Publish(default)
        .AutoConnect(0)
        .FirstAsync();
}

public Item Value => _lastValue.Wait();

接受initialValue参数的Publish运算符...

Returns a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue. This operator is a specialization of Multicast using a BehaviorSubject<T>.

BehaviorSubject<T> 是一个专门的ISubject<T>...

Represents a value that changes over time. Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.

已添加 CatchConcat 运算符以保留最后一个值,即使在源序列正常或异常完成的情况下也是如此。

就我个人而言,我会犹豫是否使用此解决方案,因为在 Do 运算符中更新的 volatile 字段会更自然地完成同样的事情。我发布它主要是为了演示 Rx 功能。