在 C# 中使用 RX 获取可观察序列中的最新项
Getting the latest item in an observable sequence using RX in C#
以下面为例:
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
我在这里试图实现的是在任何给定时间获取序列中最新项目的值"synchronously"。这意味着像 FirstAsync
这样的扩展无法弥补我的不足。
StartWith
和 Replay
位确保始终有一个值,并且 RefCount
位在我的实际代码中是必要的,以检测我何时可以执行一些处置操作.
因此,为了模拟这个 "any given time" 部分,让我们尝试在 5 秒后获取最新值:
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
// Try to get latest value from "ob" here.
});
因此,延迟 5 秒后,我需要从序列中获取值 5
,这些是我迄今为止尝试过但没有成功的方法:
ob.First()
- returns 500
ob.Latest().Take(1)
- 同上
ob.MostRecent(-1).First()
- 同上
ob.MostRecent(-1)
- 给我一个 IEnumerable<long>
满的“500”
ob.Last()
- 永远不会 returns 因为它正在等待序列完成它永远不会
ob.Latest().Last()
- 同上
ob.ToTask().Result
- 同上
ob.ToEnumerable()
- 同上
ob.MostRecent().Last()
同上
似乎周围没有太多资源可以让人们真正做到这一点。我能找到的最接近的是:“Rx: operator for getting first and most recent value from an Observable stream”,但它毕竟不是同步调用(仍在使用订阅)所以它对我不起作用。
有人知道这是否真的可行吗?
我不确定这个答案是否对您有帮助,但您是否调查过 BehaviorSubject?这是一个记住其最新值的 IObservable。它有点像一个常规变量和一个可观察变量的组合。
否则,你为什么不订阅 'ob' 并自己将最新值存储在变量中?
你需要做这样的事情:
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500);
var latestAndThenTheRest =
Observable
.Create<long>(o =>
{
var bs = new BehaviorSubject<long>(1);
var s1 = ob.Subscribe(bs);
var s2 = bs.Subscribe(o);
return new CompositeDisposable(s1, s2);
});
你在这里唯一需要考虑的是 ob
必须是一个热可观察的,这才有意义。如果天气很冷,那么每个订阅者都会在 ob
序列的开头获得一个全新的订阅。
指出您的代码可能无法按预期工作的原因
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
//Note at this point `ob` has never been subscribed to,
// so the Reference-count is 0 i.e. has not be connected.
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
// Try to get latest value from "ob" here.
//Here we make our first subscription to the `ob` sequence.
// This will connect the sequence (invoke subscribe)
// which will
// 1) invoke StartWith
// 2) invoke onNext(500)
// 3) invoke First()
// 4) First() will then unsubscribe() as it has the single value it needs
// 5) The refCount will now return to 0
// 6) The sequence will be unsubscribed to.
ob.First().Dump();
//Any future calls like `ob.First()` will thus always get the value 500.
});
可能你想要的是
var ob = Observable.Interval(TimeSpan.FromSeconds(1))
.Publish(500);
var connection = ob.Connect();
//Note at this point `ob` has never been subscribed to, so the ReferenceCount is 0 i.e. has not be connected.
var subscription = Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
// Try to get latest value from "ob" here.
ob.First().Dump();
});
//Sometime later
subscription.Dispose();
connection.Dispose()
但是,您真的不想将同步调用与 Rx 混合使用。您通常也不希望在订阅中进行订阅(因为 .First()
是订阅)。您可能想做的是获取最新值,并将其存储在某个地方。使用 .First()
只是一个滑坡。你可能会更好地写一些像
这样的东西
var subscription = Observable.Timer(TimeSpan.FromSeconds(5))
.SelectMany(_=>ob.Take(1))
.Subscribe(x =>
{
//Do something with X here.
x.Dump();
});
澄清一下,感谢@LeeCampbell 的回答。
什么不起作用:
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
ob.First().Dump();
// This gives you 500.
// Because this is the first time any one subscribes to the observable,
// so it starts right here and gives you the initial value.
});
实际可行的方法:
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
ob.Subscribe(); // Subscribe to start the above hot observable immediately.
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
ob.First().Dump();
// This would give you either 3 or 4, depending on the speed and timing of your computer.
});
以下面为例:
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
我在这里试图实现的是在任何给定时间获取序列中最新项目的值"synchronously"。这意味着像 FirstAsync
这样的扩展无法弥补我的不足。
StartWith
和 Replay
位确保始终有一个值,并且 RefCount
位在我的实际代码中是必要的,以检测我何时可以执行一些处置操作.
因此,为了模拟这个 "any given time" 部分,让我们尝试在 5 秒后获取最新值:
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
// Try to get latest value from "ob" here.
});
因此,延迟 5 秒后,我需要从序列中获取值 5
,这些是我迄今为止尝试过但没有成功的方法:
ob.First()
- returns 500ob.Latest().Take(1)
- 同上ob.MostRecent(-1).First()
- 同上ob.MostRecent(-1)
- 给我一个IEnumerable<long>
满的“500”ob.Last()
- 永远不会 returns 因为它正在等待序列完成它永远不会ob.Latest().Last()
- 同上ob.ToTask().Result
- 同上ob.ToEnumerable()
- 同上ob.MostRecent().Last()
同上
似乎周围没有太多资源可以让人们真正做到这一点。我能找到的最接近的是:“Rx: operator for getting first and most recent value from an Observable stream”,但它毕竟不是同步调用(仍在使用订阅)所以它对我不起作用。
有人知道这是否真的可行吗?
我不确定这个答案是否对您有帮助,但您是否调查过 BehaviorSubject?这是一个记住其最新值的 IObservable。它有点像一个常规变量和一个可观察变量的组合。
否则,你为什么不订阅 'ob' 并自己将最新值存储在变量中?
你需要做这样的事情:
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500);
var latestAndThenTheRest =
Observable
.Create<long>(o =>
{
var bs = new BehaviorSubject<long>(1);
var s1 = ob.Subscribe(bs);
var s2 = bs.Subscribe(o);
return new CompositeDisposable(s1, s2);
});
你在这里唯一需要考虑的是 ob
必须是一个热可观察的,这才有意义。如果天气很冷,那么每个订阅者都会在 ob
序列的开头获得一个全新的订阅。
指出您的代码可能无法按预期工作的原因
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
//Note at this point `ob` has never been subscribed to,
// so the Reference-count is 0 i.e. has not be connected.
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
// Try to get latest value from "ob" here.
//Here we make our first subscription to the `ob` sequence.
// This will connect the sequence (invoke subscribe)
// which will
// 1) invoke StartWith
// 2) invoke onNext(500)
// 3) invoke First()
// 4) First() will then unsubscribe() as it has the single value it needs
// 5) The refCount will now return to 0
// 6) The sequence will be unsubscribed to.
ob.First().Dump();
//Any future calls like `ob.First()` will thus always get the value 500.
});
可能你想要的是
var ob = Observable.Interval(TimeSpan.FromSeconds(1))
.Publish(500);
var connection = ob.Connect();
//Note at this point `ob` has never been subscribed to, so the ReferenceCount is 0 i.e. has not be connected.
var subscription = Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
// Try to get latest value from "ob" here.
ob.First().Dump();
});
//Sometime later
subscription.Dispose();
connection.Dispose()
但是,您真的不想将同步调用与 Rx 混合使用。您通常也不希望在订阅中进行订阅(因为 .First()
是订阅)。您可能想做的是获取最新值,并将其存储在某个地方。使用 .First()
只是一个滑坡。你可能会更好地写一些像
var subscription = Observable.Timer(TimeSpan.FromSeconds(5))
.SelectMany(_=>ob.Take(1))
.Subscribe(x =>
{
//Do something with X here.
x.Dump();
});
澄清一下,感谢@LeeCampbell 的回答。
什么不起作用:
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
ob.First().Dump();
// This gives you 500.
// Because this is the first time any one subscribes to the observable,
// so it starts right here and gives you the initial value.
});
实际可行的方法:
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
ob.Subscribe(); // Subscribe to start the above hot observable immediately.
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
ob.First().Dump();
// This would give you either 3 or 4, depending on the speed and timing of your computer.
});