Rx.Net GroupJoin 两个 Observables 与时间在加入条件
Rx.Net GroupJoin two Observables with time in join condition
给定 2 个热可观察对象 t1 和 t2,我将如何 GoupJoin 以便我从 t2 获取 t1 中每个事件之前 x 秒和之后 y 秒发生的所有事件?
鉴于:
t1 -----A-----B-----C
t2 --1--2--3--4--5--6
如果 t1 相隔 2 秒,t2 相隔 1 秒,我们正在寻找每个 t1 事件任一侧相隔 1 秒的 t2 事件,结果如下。
结果:
{ A, [1,2,3] }
{ B, [3,4,5] }
{ C, [5,6] }
下面是真实的例子,我们需要解决上述问题。
我们有一串电子邮件和另一串短信。我们需要发出流的另一个结果,其中有电子邮件和短信发生在电子邮件发送时间之前或之后的 1 分钟内。
代码转储答案(用100毫秒代替1秒):
var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200))
.Select(l => (char)('A' + l))
.Delay(TimeSpan.FromMilliseconds(200));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100))
.Delay(TimeSpan.FromMilliseconds(100));
var x = TimeSpan.FromMilliseconds(100); //before time
var y = TimeSpan.FromMilliseconds(100); //after time
var g = t1.Timestamp().Join(t2.Timestamp(),
c => Observable.Timer(y),
i => Observable.Timer(x + y),
(c, i) => new {GroupItem = c, RightItem = i}
)
.Where(a =>
(a.GroupItem.Timestamp > a.RightItem.Timestamp && a.GroupItem.Timestamp - a.RightItem.Timestamp <= x) //group-item came first
|| (a.GroupItem.Timestamp <= a.RightItem.Timestamp && a.RightItem.Timestamp - a.GroupItem.Timestamp <= y) // right-item came first, or exact timestamp match
)
.Select(a => new { GroupItem = a.GroupItem.Value, RightItem = a.RightItem.Value })
.GroupBy(a => a.GroupItem, a => a.RightItem);
解释: Join
就是关于 "windows"。所以当你定义一个join时,你必须考虑从left observable和right observable开始的每个item的开放时间window。我们这里的 window 很难搞清楚:我们必须以某种方式在它发生之前打开一个 window 左边可观察的 X 时间,然后在它发生之后关闭它 Y 时间。
而不是做不可能的事情,所以我们只在左边的项目出现后让它打开 Y 时间,让右边的项目 windows 由 X + Y 时间定义。但是,这会给我们留下不应该包含的项目。所以我们在时间戳上使用 Where
来过滤掉那些。
最后我们 select 找出匿名类型和时间戳并将它们组合在一起。
我不认为 GroupJoin
是解决此问题的方法:您最终会像我所做的那样拆散并重组它..
这里的问题(如 Shlomo 所述)是我们需要在 t1
事件发生之前在 t2
中打开 window。不幸的是,这是不可能的,因为一旦我们到达 t1
中的事件,我们就已经过了需要在 t2
中打开 window 的点。
我们可以做的是使用 Delay() 及时向前移动 t2
。如果我们将其偏移 x
(之前的时间),我们可以将问题重新定义为“获取 t2
中发生在 window 中的事件,开盘时间为 t1
,收盘时间为t1 + x + y
。我们可以使用 GroupJoin 来解决这个问题。
var scheduler = new HistoricalScheduler();
var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200), scheduler)
.Select(l => (char)('A' + l));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100), scheduler);
var x = TimeSpan.FromMilliseconds(100); //before time
var y = TimeSpan.FromMilliseconds(100); //after time
var delayedT2 = t2.Delay(x, scheduler);
var g = t1.GroupJoin(delayedT2 ,
_ => Observable.Timer(x + y, scheduler),
_ => Observable.Empty<Unit>(scheduler),
(a, b) => new { a, b}
);
scheduler.Start();
这给出了结果:
{ A, [1,2] }
{ B, [3,4] }
{ C, [5,6] }
这个结果仍然不尽如人意。这是因为在您的示例中 t2
事件发生在完全相同的瞬间 t1
事件。在这种情况下,首先处理 t1 + y
事件并在包含 t2
事件之前关闭 window。这意味着我们实际上得到了 (t1-01:00) <= t1 < (t1 + 01:00)
。例如。 A
的 window 是 01:0000 - 02.9999... 这就是为什么 3
出现在 03:00 不包括在内。
这可以通过简单地在我们的 y
时间
上添加一个刻度来固定为包含在内
var y = TimeSpan.FromMilliseconds(100).Add(TimeSpan.FromTicks(1));
给定 2 个热可观察对象 t1 和 t2,我将如何 GoupJoin 以便我从 t2 获取 t1 中每个事件之前 x 秒和之后 y 秒发生的所有事件?
鉴于:
t1 -----A-----B-----C
t2 --1--2--3--4--5--6
如果 t1 相隔 2 秒,t2 相隔 1 秒,我们正在寻找每个 t1 事件任一侧相隔 1 秒的 t2 事件,结果如下。
结果:
{ A, [1,2,3] }
{ B, [3,4,5] }
{ C, [5,6] }
下面是真实的例子,我们需要解决上述问题。 我们有一串电子邮件和另一串短信。我们需要发出流的另一个结果,其中有电子邮件和短信发生在电子邮件发送时间之前或之后的 1 分钟内。
代码转储答案(用100毫秒代替1秒):
var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200))
.Select(l => (char)('A' + l))
.Delay(TimeSpan.FromMilliseconds(200));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100))
.Delay(TimeSpan.FromMilliseconds(100));
var x = TimeSpan.FromMilliseconds(100); //before time
var y = TimeSpan.FromMilliseconds(100); //after time
var g = t1.Timestamp().Join(t2.Timestamp(),
c => Observable.Timer(y),
i => Observable.Timer(x + y),
(c, i) => new {GroupItem = c, RightItem = i}
)
.Where(a =>
(a.GroupItem.Timestamp > a.RightItem.Timestamp && a.GroupItem.Timestamp - a.RightItem.Timestamp <= x) //group-item came first
|| (a.GroupItem.Timestamp <= a.RightItem.Timestamp && a.RightItem.Timestamp - a.GroupItem.Timestamp <= y) // right-item came first, or exact timestamp match
)
.Select(a => new { GroupItem = a.GroupItem.Value, RightItem = a.RightItem.Value })
.GroupBy(a => a.GroupItem, a => a.RightItem);
解释: Join
就是关于 "windows"。所以当你定义一个join时,你必须考虑从left observable和right observable开始的每个item的开放时间window。我们这里的 window 很难搞清楚:我们必须以某种方式在它发生之前打开一个 window 左边可观察的 X 时间,然后在它发生之后关闭它 Y 时间。
而不是做不可能的事情,所以我们只在左边的项目出现后让它打开 Y 时间,让右边的项目 windows 由 X + Y 时间定义。但是,这会给我们留下不应该包含的项目。所以我们在时间戳上使用 Where
来过滤掉那些。
最后我们 select 找出匿名类型和时间戳并将它们组合在一起。
我不认为 GroupJoin
是解决此问题的方法:您最终会像我所做的那样拆散并重组它..
这里的问题(如 Shlomo 所述)是我们需要在 t1
事件发生之前在 t2
中打开 window。不幸的是,这是不可能的,因为一旦我们到达 t1
中的事件,我们就已经过了需要在 t2
中打开 window 的点。
我们可以做的是使用 Delay() 及时向前移动 t2
。如果我们将其偏移 x
(之前的时间),我们可以将问题重新定义为“获取 t2
中发生在 window 中的事件,开盘时间为 t1
,收盘时间为t1 + x + y
。我们可以使用 GroupJoin 来解决这个问题。
var scheduler = new HistoricalScheduler();
var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200), scheduler)
.Select(l => (char)('A' + l));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100), scheduler);
var x = TimeSpan.FromMilliseconds(100); //before time
var y = TimeSpan.FromMilliseconds(100); //after time
var delayedT2 = t2.Delay(x, scheduler);
var g = t1.GroupJoin(delayedT2 ,
_ => Observable.Timer(x + y, scheduler),
_ => Observable.Empty<Unit>(scheduler),
(a, b) => new { a, b}
);
scheduler.Start();
这给出了结果:
{ A, [1,2] }
{ B, [3,4] }
{ C, [5,6] }
这个结果仍然不尽如人意。这是因为在您的示例中 t2
事件发生在完全相同的瞬间 t1
事件。在这种情况下,首先处理 t1 + y
事件并在包含 t2
事件之前关闭 window。这意味着我们实际上得到了 (t1-01:00) <= t1 < (t1 + 01:00)
。例如。 A
的 window 是 01:0000 - 02.9999... 这就是为什么 3
出现在 03:00 不包括在内。
这可以通过简单地在我们的 y
时间
var y = TimeSpan.FromMilliseconds(100).Add(TimeSpan.FromTicks(1));