Observables 的可扩展组合
Extensible Combination of Observables
我想将列表的 IObservable 存储在一个容器中,并订阅这些可观察的组合以检索合并的列表。然后我希望能够添加更多的 Observables 而无需更新订阅并仍然获得新的结果。理想情况下,它也应该在将新的可观察对象添加到商店时触发。下面的代码应该解释得更多:
using System;
using System.Collections.Generic;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Linq;
namespace dynamic_combine
{
class ObservableStuff
{
private List<IObservable<List<String>>> _listOfObservables = new List<IObservable<List<String>>>();
public ObservableStuff() { }
public void AddObservable(IObservable<List<String>> obs)
{
_listOfObservables.Add(obs);
}
public IObservable<IList<String>> GetCombinedObservable()
{
return Observable.CombineLatest(_listOfObservables)
.Select((all) =>
{
List<String> mergedList = new List<String>();
foreach(var list in all)
{
mergedList = mergedList.Concat(list).ToList();
}
return mergedList;
});
}
}
class Program
{
static void Main(string[] args)
{
ObservableStuff Stuff = new ObservableStuff();
BehaviorSubject<List<String>> A = new BehaviorSubject<List<String>>(new List<String>() { "a", "b", "c" });
BehaviorSubject<List<String>> B = new BehaviorSubject<List<String>>(new List<String>() { "d", "e", "f" });
BehaviorSubject<List<String>> C = new BehaviorSubject<List<String>>(new List<String>() { "x", "y", "z" });
Stuff.AddObservable(A.AsObservable());
Stuff.AddObservable(B.AsObservable());
Stuff.GetCombinedObservable().Subscribe((x) =>
{
Console.WriteLine(String.Join(",", x));
});
// Initial Output: a,b,c,d,e,f
A.OnNext(new List<String>() { "1", "2", "3", "4", "5" });
// Output: 1,2,3,4,5,d,e,f
B.OnNext(new List<String>() { "6", "7", "8", "9", "0" });
// Output: 1,2,3,4,5,6,7,8,9,0
Stuff.AddObservable(C.AsObservable());
// Wishful Output: 1,2,3,4,5,6,7,8,9,0,x,y,z
C.OnNext(new List<String>() { "y", "e", "a", "h" });
// Wishful Output: 1,2,3,4,5,6,7,8,9,0,y,e,a,h
Console.WriteLine("Press the any key...");
Console.ReadKey();
}
}
}
虽然这个例子是用C#写的,但最终还是需要用rxCpp来实现。看到 Rx 的其他变体中的实现也很有趣。
我已经建立了一个存储库来检查代码,并且可能会将其扩展到其他语言:
https://gitlab.com/dwaldorf/rx-examples
BR,
丹尼尔
首先,进行一些更改,因为您的代码不那么容易阅读。 GetCombinedObservable
可以重写为:
public IObservable<IList<String>> GetCombinedObservable()
{
return Observable.CombineLatest(_listOfObservables)
.Select(l => l.SelectMany(s => s).ToList());
}
您的问题归结为两件事:您希望 _listOfObservables
是动态的,这意味着将其从 List<IObservable<T>>
更改为 IObservable<IObservable<T>>
。但是,这样做的问题是 CombineLatest
不支持 IObservable<IObservable<T>>
,因此我们必须创建一个。
这给我们带来了这个有趣、丑陋的小功能(使用 nuget 包 System.Collections.Immutable
):
public static class X
{
public static IObservable<List<T>> DynamicCombineLatest<T>(this IObservable<IObservable<T>> source)
{
return source
.SelectMany((o, i) => o.Select(item => (observableIndex: i, item: item)))
.Scan(ImmutableDictionary<int, T>.Empty, (state, t) => state.SetItem(t.observableIndex, t.item))
.Select(dict => dict.OrderBy(kvp => kvp.Key).Select(kvp => kvp.Value).ToList());
}
}
现在我们可以更新您的 class:
class ObservableStuff
{
private ReplaySubject<IObservable<List<String>>> _subject = new ReplaySubject<IObservable<List<String>>>(int.MaxValue);
public ObservableStuff() { }
public void AddObservable(IObservable<List<String>> obs)
{
_subject.OnNext(obs);
}
public IObservable<IList<String>> GetCombinedObservable()
{
return _subject
.DynamicCombineLatest()
.Select(l => l.SelectMany(s => s).ToList());
}
}
我想将列表的 IObservable 存储在一个容器中,并订阅这些可观察的组合以检索合并的列表。然后我希望能够添加更多的 Observables 而无需更新订阅并仍然获得新的结果。理想情况下,它也应该在将新的可观察对象添加到商店时触发。下面的代码应该解释得更多:
using System;
using System.Collections.Generic;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Linq;
namespace dynamic_combine
{
class ObservableStuff
{
private List<IObservable<List<String>>> _listOfObservables = new List<IObservable<List<String>>>();
public ObservableStuff() { }
public void AddObservable(IObservable<List<String>> obs)
{
_listOfObservables.Add(obs);
}
public IObservable<IList<String>> GetCombinedObservable()
{
return Observable.CombineLatest(_listOfObservables)
.Select((all) =>
{
List<String> mergedList = new List<String>();
foreach(var list in all)
{
mergedList = mergedList.Concat(list).ToList();
}
return mergedList;
});
}
}
class Program
{
static void Main(string[] args)
{
ObservableStuff Stuff = new ObservableStuff();
BehaviorSubject<List<String>> A = new BehaviorSubject<List<String>>(new List<String>() { "a", "b", "c" });
BehaviorSubject<List<String>> B = new BehaviorSubject<List<String>>(new List<String>() { "d", "e", "f" });
BehaviorSubject<List<String>> C = new BehaviorSubject<List<String>>(new List<String>() { "x", "y", "z" });
Stuff.AddObservable(A.AsObservable());
Stuff.AddObservable(B.AsObservable());
Stuff.GetCombinedObservable().Subscribe((x) =>
{
Console.WriteLine(String.Join(",", x));
});
// Initial Output: a,b,c,d,e,f
A.OnNext(new List<String>() { "1", "2", "3", "4", "5" });
// Output: 1,2,3,4,5,d,e,f
B.OnNext(new List<String>() { "6", "7", "8", "9", "0" });
// Output: 1,2,3,4,5,6,7,8,9,0
Stuff.AddObservable(C.AsObservable());
// Wishful Output: 1,2,3,4,5,6,7,8,9,0,x,y,z
C.OnNext(new List<String>() { "y", "e", "a", "h" });
// Wishful Output: 1,2,3,4,5,6,7,8,9,0,y,e,a,h
Console.WriteLine("Press the any key...");
Console.ReadKey();
}
}
}
虽然这个例子是用C#写的,但最终还是需要用rxCpp来实现。看到 Rx 的其他变体中的实现也很有趣。
我已经建立了一个存储库来检查代码,并且可能会将其扩展到其他语言: https://gitlab.com/dwaldorf/rx-examples
BR, 丹尼尔
首先,进行一些更改,因为您的代码不那么容易阅读。 GetCombinedObservable
可以重写为:
public IObservable<IList<String>> GetCombinedObservable()
{
return Observable.CombineLatest(_listOfObservables)
.Select(l => l.SelectMany(s => s).ToList());
}
您的问题归结为两件事:您希望 _listOfObservables
是动态的,这意味着将其从 List<IObservable<T>>
更改为 IObservable<IObservable<T>>
。但是,这样做的问题是 CombineLatest
不支持 IObservable<IObservable<T>>
,因此我们必须创建一个。
这给我们带来了这个有趣、丑陋的小功能(使用 nuget 包 System.Collections.Immutable
):
public static class X
{
public static IObservable<List<T>> DynamicCombineLatest<T>(this IObservable<IObservable<T>> source)
{
return source
.SelectMany((o, i) => o.Select(item => (observableIndex: i, item: item)))
.Scan(ImmutableDictionary<int, T>.Empty, (state, t) => state.SetItem(t.observableIndex, t.item))
.Select(dict => dict.OrderBy(kvp => kvp.Key).Select(kvp => kvp.Value).ToList());
}
}
现在我们可以更新您的 class:
class ObservableStuff
{
private ReplaySubject<IObservable<List<String>>> _subject = new ReplaySubject<IObservable<List<String>>>(int.MaxValue);
public ObservableStuff() { }
public void AddObservable(IObservable<List<String>> obs)
{
_subject.OnNext(obs);
}
public IObservable<IList<String>> GetCombinedObservable()
{
return _subject
.DynamicCombineLatest()
.Select(l => l.SelectMany(s => s).ToList());
}
}