如何在 Rx.NET 中组合两个不同的 GroupedStreams?
How to combine two different GroupedStreams in Rx.NET?
类似,但不适用于我的情况,因为用户需要来自同一个 IGroupedObservable 的合并可观察流,而我想合并来自不同组的流。
我有以下结构和流:
type A = {
Id: int
Value: int
}
type B = {
Id: int
Value: int
}
//subjects to test input, just any source of As and Bs
let subjectA: Subject<A> = Subject.broadcast
let subjectB: Subject<B> = Subject.broadcast
//grouped streams
let groupedA: IObservable<<IGroupedObservable<int, A>> = Observable.groupBy (fun a -> a.Id) subjectA
let groupedB: IObservable<<IGroupedObservable<int, B>> = Observable.groupBy (fun b -> b.Id) subjectB
我想在 groupedA.Key = groupedB.Key 时以某种方式合并 A 和 B 的内部可观察量,并获得 (A, B) 对的可观察量,其中 A.Id = B.Id
我想要的签名是这样的
IObservable<IGroupedObservable<int, A>> -> IObservable<IGroupedObservable<int, B>> -> IObservable<IGroupedObservable<int, (A, B)>>
其中对于所有 (A, B),A.Id = B.Id
我尝试了一堆 combineLatest、groupJoin、过滤器和地图变体,但没有成功。
我将 F# 与 Rx.Net 和 FSharp.Control.Reactive 一起使用,但如果您知道 C#(或任何语言,真的)的答案,请 post 它
首先,这里有你想要的签名:
let cartesian left right =
rxquery {
for a in left do
for b in right do
yield a, b
}
let mergeGroups left right =
rxquery {
for (leftGroup : IGroupedObservable<'key, 'a>) in left do
for (rightGroup : IGroupedObservable<'key, 'b>) in right do
if leftGroup.Key = rightGroup.Key then
let merged = cartesian leftGroup rightGroup
yield {
new IGroupedObservable<_, _> with
member __.Key = leftGroup.Key
member __.Subscribe(observer) = merged.Subscribe(observer)
}
}
但是,在我的测试中,这些组都是空的。我没有足够的 Rx 经验知道为什么,但也许其他人知道。
这是您可以使用的自定义运算符 GroupJoin
。它基于 Select
、Merge
、GroupBy
和 Where
运算符:
/// <summary>
/// Groups and joins the elements of two observable sequences, based on common keys.
/// </summary>
public static IObservable<(TKey Key, IObservable<TLeft> Left, IObservable<TRight> Right)>
GroupJoin<TLeft, TRight, TKey>(
this IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, TKey> leftKeySelector,
Func<TRight, TKey> rightKeySelector,
IEqualityComparer<TKey> keyComparer = null)
{
// Arguments validation omitted
keyComparer ??= EqualityComparer<TKey>.Default;
return left
.Select(x => (x, (TRight)default, Type: 1, Key: leftKeySelector(x)))
.Merge(right.Select(x => ((TLeft)default, x, Type: 2, Key: rightKeySelector(x))))
.GroupBy(e => e.Key, keyComparer)
.Select(g => (
g.Key,
g.Where(e => e.Type == 1).Select(e => e.Item1),
g.Where(e => e.Type == 2).Select(e => e.Item2)
));
}
用法示例:
var subjectA = new Subject<A>();
var subjectB = new Subject<B>();
IObservable<IGroupedObservable<int, (A, B)>> query = subjectA
.GroupJoin(subjectB, a => a.Id, b => b.Id)
.SelectMany(g => g.Left.Zip(g.Right, (a, b) => (g.Key, a, b)))
.GroupBy(e => e.Key, e => (e.a, e.b));
我不清楚这是否是您想要的。因此,首先使用运行器代码进行澄清可能会有所帮助。假设以下运行器代码:
var aSubject = new Subject<A>();
var bSubject = new Subject<B>();
var groupedA = aSubject.GroupBy(a => a.Id);
var groupedB = bSubject.GroupBy(b => b.Id);
//Initiate solution
solution.Merge()
.Subscribe(t => Console.WriteLine($"(Id = {t.a.Id}, AValue = {t.a.Value}, BValue = {t.b.Value} )"));
aSubject.OnNext(new A() { Id = 1, Value = 1 });
aSubject.OnNext(new A() { Id = 1, Value = 2 });
bSubject.OnNext(new B() { Id = 1, Value = 10 });
bSubject.OnNext(new B() { Id = 1, Value = 20 });
bSubject.OnNext(new B() { Id = 1, Value = 30 });
是否要查看以下输出:
(Id = 1, AValue = 1, BValue = 10)
(Id = 1, AValue = 2, BValue = 10)
(Id = 1, AValue = 1, BValue = 20)
(Id = 1, AValue = 2, BValue = 20)
(Id = 1, AValue = 1, BValue = 30)
(Id = 1, AValue = 2, BValue = 30)
如果是这样,您可以通过以下方式解决:
var solution = groupedA.Merge()
.Join(groupedB.Merge(),
_ => Observable.Never<Unit>(),
_ => Observable.Never<Unit>(),
(a, b) => (a, b)
)
.Where(t => t.a.Id == t.b.Id)
.GroupBy(g => g.a.Id);
我要提醒的是,如果这是长期 运行 过程的一部分,这里会有 memory/performance 影响。这会将所有 A
和 B
对象无限期地保存在内存中,等待它们是否可以配对。要缩短它们在内存中保留的时间,请将 Observable.Never()
调用更改为适当的 windows 以决定每个对象在内存中保留多长时间。
我有以下结构和流:
type A = {
Id: int
Value: int
}
type B = {
Id: int
Value: int
}
//subjects to test input, just any source of As and Bs
let subjectA: Subject<A> = Subject.broadcast
let subjectB: Subject<B> = Subject.broadcast
//grouped streams
let groupedA: IObservable<<IGroupedObservable<int, A>> = Observable.groupBy (fun a -> a.Id) subjectA
let groupedB: IObservable<<IGroupedObservable<int, B>> = Observable.groupBy (fun b -> b.Id) subjectB
我想在 groupedA.Key = groupedB.Key 时以某种方式合并 A 和 B 的内部可观察量,并获得 (A, B) 对的可观察量,其中 A.Id = B.Id
我想要的签名是这样的
IObservable<IGroupedObservable<int, A>> -> IObservable<IGroupedObservable<int, B>> -> IObservable<IGroupedObservable<int, (A, B)>>
其中对于所有 (A, B),A.Id = B.Id
我尝试了一堆 combineLatest、groupJoin、过滤器和地图变体,但没有成功。
我将 F# 与 Rx.Net 和 FSharp.Control.Reactive 一起使用,但如果您知道 C#(或任何语言,真的)的答案,请 post 它
首先,这里有你想要的签名:
let cartesian left right =
rxquery {
for a in left do
for b in right do
yield a, b
}
let mergeGroups left right =
rxquery {
for (leftGroup : IGroupedObservable<'key, 'a>) in left do
for (rightGroup : IGroupedObservable<'key, 'b>) in right do
if leftGroup.Key = rightGroup.Key then
let merged = cartesian leftGroup rightGroup
yield {
new IGroupedObservable<_, _> with
member __.Key = leftGroup.Key
member __.Subscribe(observer) = merged.Subscribe(observer)
}
}
但是,在我的测试中,这些组都是空的。我没有足够的 Rx 经验知道为什么,但也许其他人知道。
这是您可以使用的自定义运算符 GroupJoin
。它基于 Select
、Merge
、GroupBy
和 Where
运算符:
/// <summary>
/// Groups and joins the elements of two observable sequences, based on common keys.
/// </summary>
public static IObservable<(TKey Key, IObservable<TLeft> Left, IObservable<TRight> Right)>
GroupJoin<TLeft, TRight, TKey>(
this IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, TKey> leftKeySelector,
Func<TRight, TKey> rightKeySelector,
IEqualityComparer<TKey> keyComparer = null)
{
// Arguments validation omitted
keyComparer ??= EqualityComparer<TKey>.Default;
return left
.Select(x => (x, (TRight)default, Type: 1, Key: leftKeySelector(x)))
.Merge(right.Select(x => ((TLeft)default, x, Type: 2, Key: rightKeySelector(x))))
.GroupBy(e => e.Key, keyComparer)
.Select(g => (
g.Key,
g.Where(e => e.Type == 1).Select(e => e.Item1),
g.Where(e => e.Type == 2).Select(e => e.Item2)
));
}
用法示例:
var subjectA = new Subject<A>();
var subjectB = new Subject<B>();
IObservable<IGroupedObservable<int, (A, B)>> query = subjectA
.GroupJoin(subjectB, a => a.Id, b => b.Id)
.SelectMany(g => g.Left.Zip(g.Right, (a, b) => (g.Key, a, b)))
.GroupBy(e => e.Key, e => (e.a, e.b));
我不清楚这是否是您想要的。因此,首先使用运行器代码进行澄清可能会有所帮助。假设以下运行器代码:
var aSubject = new Subject<A>();
var bSubject = new Subject<B>();
var groupedA = aSubject.GroupBy(a => a.Id);
var groupedB = bSubject.GroupBy(b => b.Id);
//Initiate solution
solution.Merge()
.Subscribe(t => Console.WriteLine($"(Id = {t.a.Id}, AValue = {t.a.Value}, BValue = {t.b.Value} )"));
aSubject.OnNext(new A() { Id = 1, Value = 1 });
aSubject.OnNext(new A() { Id = 1, Value = 2 });
bSubject.OnNext(new B() { Id = 1, Value = 10 });
bSubject.OnNext(new B() { Id = 1, Value = 20 });
bSubject.OnNext(new B() { Id = 1, Value = 30 });
是否要查看以下输出:
(Id = 1, AValue = 1, BValue = 10)
(Id = 1, AValue = 2, BValue = 10)
(Id = 1, AValue = 1, BValue = 20)
(Id = 1, AValue = 2, BValue = 20)
(Id = 1, AValue = 1, BValue = 30)
(Id = 1, AValue = 2, BValue = 30)
如果是这样,您可以通过以下方式解决:
var solution = groupedA.Merge()
.Join(groupedB.Merge(),
_ => Observable.Never<Unit>(),
_ => Observable.Never<Unit>(),
(a, b) => (a, b)
)
.Where(t => t.a.Id == t.b.Id)
.GroupBy(g => g.a.Id);
我要提醒的是,如果这是长期 运行 过程的一部分,这里会有 memory/performance 影响。这会将所有 A
和 B
对象无限期地保存在内存中,等待它们是否可以配对。要缩短它们在内存中保留的时间,请将 Observable.Never()
调用更改为适当的 windows 以决定每个对象在内存中保留多长时间。