将最后一项推送到 Observable(序列)
Take the last item pushed to an Observable (Sequence)
我在 class 中有一个 IObservable<Item>
,我想公开一个只读的 属性,它提供在给定时间推送到可观察对象的最后一个项目。所以它将提供单个值Item
。
如果没有值被推送,那么它必须return一个默认值。
如何在不订阅 observable 和 "backing field" 的情况下做到这一点?
假设有一个热可观察对象。
对于observable = source.Replay(1); observable.Connect();
提供值:
public int Value =>
observable.Take(1).Amb(Observable.Return(defaultValue)).Wait();
这将 return 一个默认值,以防没有任何值被推送。
您想要从 Reactive 过渡到 state,因此支持字段不是一个糟糕的选择。你提到你不想订阅,但要观察任何东西:某事,某处必须订阅。
只是在这里补充一下@Asti 的回答,也许可以帮助您解决挫败感:
Observable 不是物理的 'thing',它更像是一个逻辑概念。经常将 Rx 与 LINQ 进行比较,并且在很多时候这是一个公平的比较。但当您开始谈论数据结构时,它会崩溃:LINQ 的枚举与用于学习目的的列表非常相似。
但是,在 Rx 方面,根本没有与 List 等效的东西。可观察对象是一种瞬态数据结构,所有运算符都处理这种瞬态。如果您正在寻找永久状态,那么您将离开 Rx。
话虽如此,将可观察对象转换为某种状态是一个常见问题,并且有一些包可以帮助您:ReactiveUI 可能是最著名的。 ReactiveProperty 是另一个。这两个包都有缺陷,但可能对你有帮助。
如果您只是想寻找一种更简单的方法来获得支持字段,而无需 boiler-plating 支持字段,这将有效:
public static class ReactivePropertyExtensions
{
public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source)
{
return new ReactiveProperty<T>(source);
}
public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source, T defaultValue)
{
return new ReactiveProperty<T>(source, defaultValue);
}
}
public class ReactiveProperty<T> : IDisposable
{
private IObservable<T> Source { get; }
private IDisposable Subscription { get; }
public T Value { get; private set; }
public ReactiveProperty(IObservable<T> source)
: this(source, default(T)) { }
public ReactiveProperty(IObservable<T> source, T defaultValue)
{
Value = defaultValue;
Source = source;
Subscription = source.Subscribe(t => Value = t);
}
public void Dispose()
{
Subscription.Dispose();
}
}
使用示例:
var ticker = Observable.Interval(TimeSpan.FromSeconds(1))
.Publish().RefCount();
var latestTickerValue = ticker.ToReactiveProperty();
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(3));
Console.WriteLine(latestTickerValue.Value);
这是另一种定义 Value
属性 的方式,本着 Asti 的 .
的精神
private readonly IObservable<Item> _source;
private readonly IObservable<Item> _lastValue;
public SomeClass() // Constructor
{
_source = /* Initialize the source observable (hot) */
_lastValue = _source
.Catch(Observable.Never<Item>())
.Concat(Observable.Never<Item>())
.Publish(default)
.AutoConnect(0)
.FirstAsync();
}
public Item Value => _lastValue.Wait();
接受initialValue
参数的Publish
运算符...
Returns a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue
. This operator is a specialization of Multicast
using a BehaviorSubject<T>
.
BehaviorSubject<T>
是一个专门的ISubject<T>
...
Represents a value that changes over time. Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.
已添加 Catch
和 Concat
运算符以保留最后一个值,即使在源序列正常或异常完成的情况下也是如此。
就我个人而言,我会犹豫是否使用此解决方案,因为在 Do
运算符中更新的 volatile
字段会更自然地完成同样的事情。我发布它主要是为了演示 Rx 功能。
我在 class 中有一个 IObservable<Item>
,我想公开一个只读的 属性,它提供在给定时间推送到可观察对象的最后一个项目。所以它将提供单个值Item
。
如果没有值被推送,那么它必须return一个默认值。
如何在不订阅 observable 和 "backing field" 的情况下做到这一点?
假设有一个热可观察对象。
对于observable = source.Replay(1); observable.Connect();
提供值:
public int Value =>
observable.Take(1).Amb(Observable.Return(defaultValue)).Wait();
这将 return 一个默认值,以防没有任何值被推送。
您想要从 Reactive 过渡到 state,因此支持字段不是一个糟糕的选择。你提到你不想订阅,但要观察任何东西:某事,某处必须订阅。
只是在这里补充一下@Asti 的回答,也许可以帮助您解决挫败感:
Observable 不是物理的 'thing',它更像是一个逻辑概念。经常将 Rx 与 LINQ 进行比较,并且在很多时候这是一个公平的比较。但当您开始谈论数据结构时,它会崩溃:LINQ 的枚举与用于学习目的的列表非常相似。
但是,在 Rx 方面,根本没有与 List 等效的东西。可观察对象是一种瞬态数据结构,所有运算符都处理这种瞬态。如果您正在寻找永久状态,那么您将离开 Rx。
话虽如此,将可观察对象转换为某种状态是一个常见问题,并且有一些包可以帮助您:ReactiveUI 可能是最著名的。 ReactiveProperty 是另一个。这两个包都有缺陷,但可能对你有帮助。
如果您只是想寻找一种更简单的方法来获得支持字段,而无需 boiler-plating 支持字段,这将有效:
public static class ReactivePropertyExtensions
{
public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source)
{
return new ReactiveProperty<T>(source);
}
public static ReactiveProperty<T> ToReactiveProperty<T>(this IObservable<T> source, T defaultValue)
{
return new ReactiveProperty<T>(source, defaultValue);
}
}
public class ReactiveProperty<T> : IDisposable
{
private IObservable<T> Source { get; }
private IDisposable Subscription { get; }
public T Value { get; private set; }
public ReactiveProperty(IObservable<T> source)
: this(source, default(T)) { }
public ReactiveProperty(IObservable<T> source, T defaultValue)
{
Value = defaultValue;
Source = source;
Subscription = source.Subscribe(t => Value = t);
}
public void Dispose()
{
Subscription.Dispose();
}
}
使用示例:
var ticker = Observable.Interval(TimeSpan.FromSeconds(1))
.Publish().RefCount();
var latestTickerValue = ticker.ToReactiveProperty();
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine(latestTickerValue.Value);
await Task.Delay(TimeSpan.FromSeconds(3));
Console.WriteLine(latestTickerValue.Value);
这是另一种定义 Value
属性 的方式,本着 Asti 的
private readonly IObservable<Item> _source;
private readonly IObservable<Item> _lastValue;
public SomeClass() // Constructor
{
_source = /* Initialize the source observable (hot) */
_lastValue = _source
.Catch(Observable.Never<Item>())
.Concat(Observable.Never<Item>())
.Publish(default)
.AutoConnect(0)
.FirstAsync();
}
public Item Value => _lastValue.Wait();
接受initialValue
参数的Publish
运算符...
Returns a connectable observable sequence that shares a single subscription to the underlying sequence and starts with
initialValue
. This operator is a specialization ofMulticast
using aBehaviorSubject<T>
.
BehaviorSubject<T>
是一个专门的ISubject<T>
...
Represents a value that changes over time. Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.
已添加 Catch
和 Concat
运算符以保留最后一个值,即使在源序列正常或异常完成的情况下也是如此。
就我个人而言,我会犹豫是否使用此解决方案,因为在 Do
运算符中更新的 volatile
字段会更自然地完成同样的事情。我发布它主要是为了演示 Rx 功能。