如何获取 IObservable<IObservable<T>> 的最新变化事件?
How to get latest changed events of IObservable<IObservable<T>>?
我的系统有很多状态对象——连接状态、cpu负载、登录用户等等。所有此类事件都合并到单个可观察流中。
我想制作一个管理实用程序来显示系统的实际状态并显示所有计数器。
我如何创建一个包含所有计数器最后更改值的列表的可观察对象?
这是我想要的弹珠图:
s1 (cpu): -s1_v1----s1_v1---s1_v2
s2 (users count): --s2_v1--s2_v1---------s2_v2
s3 (some cat purr/sec) ----s3_v1----s3_v1----s3_v1
flatten sequence: s1_v1-s2_v1-s3_v1-s2_v1-s1_v1-s3_v1-s1_v2-s3_v1-s2_v2
期望的输出:
s1_v1|s1_v1|s1_v1|s1_v2|s1_v2
s2_v1|s2_v1|s2_v1|s2_v2
s3_v1|s3_v1|s3_v1
到目前为止我可以实现这个:
public class StatusImplementation
{
public static IObservable<IDictionary<TKey, TValue>> Status<TKey, TValue>(
params IObservable<KeyValuePair<TKey, TValue>>[] observables)
{
var uniqueObservables = observables
.Select(x => x.Publish().RefCount().DistinctUntilChanged());
return Observable.Create<IDictionary<TKey, TValue>>(o =>
{
var compositeDisposable = new CompositeDisposable();
var dictionary = new Dictionary<TKey, TValue>();
foreach (var uniqueObservable in uniqueObservables)
{
var disposable = uniqueObservable.Subscribe(x =>
{
if (dictionary.ContainsKey(x.Key) && !dictionary[x.Key].Equals(x.Value))
{
var newDictionary = new Dictionary<TKey, TValue>(dictionary);
newDictionary[x.Key] = x.Value;
dictionary = newDictionary;
}
else
{
dictionary.Add(x.Key, x.Value);
}
o.OnNext(dictionary);
});
compositeDisposable.Add(disposable);
}
return compositeDisposable;
});
}
}
这是一个用法示例:
var f1 = Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Select(x => new KeyValuePair<string, long>("event 1", x));
var f2 = Observable.Interval(TimeSpan.FromMilliseconds(1200))
.Select(x => new KeyValuePair<string, long>("event 2", x));
var f3 = Observable.Interval(TimeSpan.FromMilliseconds(1250))
.Select(x => new KeyValuePair<string, long>("event 3", x));
var combined = f1.Merge(f2).Merge(f3);
StatusImplementation.Status(f1, f2, f3)
.Select(x => string.Join(", ", x.ToList()))
.Dump("\tstatus");
combined.Dump("normal");
和 Dump 函数(来自 great Lee Campbell 的书):
public static void Dump<T>(this IObservable<T> source, string name)
{
source.Subscribe(
i => Console.WriteLine("{0}-->{1}", name, i),
ex => Console.WriteLine("{0} failed-->{1}", name, ex.Message),
() => Console.WriteLine("{0} completed", name));
}
所以问题是:有没有更好的方法来实现这个功能?可能没有在 observable 中使用 Dictionary?
谢谢。
您可以使用 Observable.CombineLatest
,它会在每次新值到达时从所有 observable 发出最新值。那你就不用用字典了
var f1 = Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Select(x = > new KeyValuePair < string, long > ("event 1", x));
var f2 = Observable.Interval(TimeSpan.FromMilliseconds(1200))
.Select(x = > new KeyValuePair < string, long > ("event 2", x));
var f3 = Observable.Interval(TimeSpan.FromMilliseconds(1250))
.Select(x = > new KeyValuePair < string, long > ("event 3", x));
var combined = f1.Merge(f2).Merge(f3);
Observable.CombineLatest(f1, f2, f3)
.Select(x = > string.Join(", ", x.ToList()))
.Dump("\tstatus");
combined.Dump("normal");
因此,如果您从 combined
可观察对象开始——它可以从任意数量的源可观察对象生成——那么您可以这样做:
var query =
combined
.Scan(
new Dictionary<string, long>() as IDictionary<string, long>,
(d, kvp) =>
{
var d2 = new Dictionary<string, long>(d) as IDictionary<string, long>;
d2[kvp.Key] = kvp.Value;
return d2;
});
这将为 combined
可观察对象生成的每个值 return 一系列字典对象。每个字典对象都将是一个不同的实例 - 如果同一个实例被 returned 你会不断改变值,这可能会导致线程问题。
我的系统有很多状态对象——连接状态、cpu负载、登录用户等等。所有此类事件都合并到单个可观察流中。
我想制作一个管理实用程序来显示系统的实际状态并显示所有计数器。
我如何创建一个包含所有计数器最后更改值的列表的可观察对象?
这是我想要的弹珠图:
s1 (cpu): -s1_v1----s1_v1---s1_v2
s2 (users count): --s2_v1--s2_v1---------s2_v2
s3 (some cat purr/sec) ----s3_v1----s3_v1----s3_v1
flatten sequence: s1_v1-s2_v1-s3_v1-s2_v1-s1_v1-s3_v1-s1_v2-s3_v1-s2_v2
期望的输出:
s1_v1|s1_v1|s1_v1|s1_v2|s1_v2
s2_v1|s2_v1|s2_v1|s2_v2
s3_v1|s3_v1|s3_v1
到目前为止我可以实现这个:
public class StatusImplementation
{
public static IObservable<IDictionary<TKey, TValue>> Status<TKey, TValue>(
params IObservable<KeyValuePair<TKey, TValue>>[] observables)
{
var uniqueObservables = observables
.Select(x => x.Publish().RefCount().DistinctUntilChanged());
return Observable.Create<IDictionary<TKey, TValue>>(o =>
{
var compositeDisposable = new CompositeDisposable();
var dictionary = new Dictionary<TKey, TValue>();
foreach (var uniqueObservable in uniqueObservables)
{
var disposable = uniqueObservable.Subscribe(x =>
{
if (dictionary.ContainsKey(x.Key) && !dictionary[x.Key].Equals(x.Value))
{
var newDictionary = new Dictionary<TKey, TValue>(dictionary);
newDictionary[x.Key] = x.Value;
dictionary = newDictionary;
}
else
{
dictionary.Add(x.Key, x.Value);
}
o.OnNext(dictionary);
});
compositeDisposable.Add(disposable);
}
return compositeDisposable;
});
}
}
这是一个用法示例:
var f1 = Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Select(x => new KeyValuePair<string, long>("event 1", x));
var f2 = Observable.Interval(TimeSpan.FromMilliseconds(1200))
.Select(x => new KeyValuePair<string, long>("event 2", x));
var f3 = Observable.Interval(TimeSpan.FromMilliseconds(1250))
.Select(x => new KeyValuePair<string, long>("event 3", x));
var combined = f1.Merge(f2).Merge(f3);
StatusImplementation.Status(f1, f2, f3)
.Select(x => string.Join(", ", x.ToList()))
.Dump("\tstatus");
combined.Dump("normal");
和 Dump 函数(来自 great Lee Campbell 的书):
public static void Dump<T>(this IObservable<T> source, string name)
{
source.Subscribe(
i => Console.WriteLine("{0}-->{1}", name, i),
ex => Console.WriteLine("{0} failed-->{1}", name, ex.Message),
() => Console.WriteLine("{0} completed", name));
}
所以问题是:有没有更好的方法来实现这个功能?可能没有在 observable 中使用 Dictionary?
谢谢。
您可以使用 Observable.CombineLatest
,它会在每次新值到达时从所有 observable 发出最新值。那你就不用用字典了
var f1 = Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Select(x = > new KeyValuePair < string, long > ("event 1", x));
var f2 = Observable.Interval(TimeSpan.FromMilliseconds(1200))
.Select(x = > new KeyValuePair < string, long > ("event 2", x));
var f3 = Observable.Interval(TimeSpan.FromMilliseconds(1250))
.Select(x = > new KeyValuePair < string, long > ("event 3", x));
var combined = f1.Merge(f2).Merge(f3);
Observable.CombineLatest(f1, f2, f3)
.Select(x = > string.Join(", ", x.ToList()))
.Dump("\tstatus");
combined.Dump("normal");
因此,如果您从 combined
可观察对象开始——它可以从任意数量的源可观察对象生成——那么您可以这样做:
var query =
combined
.Scan(
new Dictionary<string, long>() as IDictionary<string, long>,
(d, kvp) =>
{
var d2 = new Dictionary<string, long>(d) as IDictionary<string, long>;
d2[kvp.Key] = kvp.Value;
return d2;
});
这将为 combined
可观察对象生成的每个值 return 一系列字典对象。每个字典对象都将是一个不同的实例 - 如果同一个实例被 returned 你会不断改变值,这可能会导致线程问题。