Rx.Net GroupJoin 两个 Observables 与时间在加入条件

Rx.Net GroupJoin two Observables with time in join condition

给定 2 个热可观察对象 t1 和 t2,我将如何 GoupJoin 以便我从 t2 获取 t1 中每个事件之前 x 秒和之后 y 秒发生的所有事件?

鉴于:

t1 -----A-----B-----C

t2 --1--2--3--4--5--6

如果 t1 相隔 2 秒,t2 相隔 1 秒,我们正在寻找每个 t1 事件任一侧相隔 1 秒的 t2 事件,结果如下。

结果:

{ A, [1,2,3] }

{ B, [3,4,5] }

{ C, [5,6] }

下面是真实的例子,我们需要解决上述问题。 我们有一串电子邮件和另一串短信。我们需要发出流的另一个结果,其中有电子邮件和短信发生在电子邮件发送时间之前或之后的 1 分钟内。

代码转储答案(用100毫秒代替1秒):

var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200))
    .Select(l => (char)('A' + l))
    .Delay(TimeSpan.FromMilliseconds(200));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100))
    .Delay(TimeSpan.FromMilliseconds(100));

var x = TimeSpan.FromMilliseconds(100);     //before time
var y = TimeSpan.FromMilliseconds(100);     //after time

var g = t1.Timestamp().Join(t2.Timestamp(),
    c => Observable.Timer(y),
    i => Observable.Timer(x + y),
    (c, i) => new {GroupItem = c, RightItem = i}
)
    .Where(a =>
        (a.GroupItem.Timestamp > a.RightItem.Timestamp && a.GroupItem.Timestamp - a.RightItem.Timestamp <= x) //group-item came first
        || (a.GroupItem.Timestamp <= a.RightItem.Timestamp && a.RightItem.Timestamp - a.GroupItem.Timestamp <= y) // right-item came first, or exact timestamp match
    )
    .Select(a => new { GroupItem = a.GroupItem.Value, RightItem = a.RightItem.Value })
    .GroupBy(a => a.GroupItem, a => a.RightItem);

解释: Join 就是关于 "windows"。所以当你定义一个join时,你必须考虑从left observable和right observable开始的每个item的开放时间window。我们这里的 window 很难搞清楚:我们必须以某种方式在它发生之前打开一个 window 左边可观察的 X 时间,然后在它发生之后关闭它 Y 时间。

而不是做不可能的事情,所以我们只在左边的项目出现后让它打开 Y 时间,让右边的项目 windows 由 X + Y 时间定义。但是,这会给我们留下不应该包含的项目。所以我们在时间戳上使用 Where 来过滤掉那些。

最后我们 select 找出匿名类型和时间戳并将它们组合在一起。

我不认为 GroupJoin 是解决此问题的方法:您最终会像我所做的那样拆散并重组它..

这里的问题(如 Shlomo 所述)是我们需要在 t1 事件发生之前在 t2 中打开 window。不幸的是,这是不可能的,因为一旦我们到达 t1 中的事件,我们就已经过了需要在 t2 中打开 window 的点。

我们可以做的是使用 Delay() 及时向前移动 t2。如果我们将其偏移 x(之前的时间),我们可以将问题重新定义为“获取 t2 中发生在 window 中的事件,开盘时间为 t1,收盘时间为t1 + x + y。我们可以使用 GroupJoin 来解决这个问题。

var scheduler = new HistoricalScheduler();

var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200), scheduler)
    .Select(l => (char)('A' + l));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100), scheduler);

var x = TimeSpan.FromMilliseconds(100);     //before time
var y = TimeSpan.FromMilliseconds(100);     //after time

var delayedT2 = t2.Delay(x, scheduler);

var g = t1.GroupJoin(delayedT2 ,
    _ => Observable.Timer(x + y, scheduler),
    _ => Observable.Empty<Unit>(scheduler),
    (a, b) => new { a, b}
);

scheduler.Start();

这给出了结果:

{ A, [1,2] }
{ B, [3,4] }
{ C, [5,6] }

这个结果仍然不尽如人意。这是因为在您的示例中 t2 事件发生在完全相同的瞬间 t1 事件。在这种情况下,首先处理 t1 + y 事件并在包含 t2 事件之前关闭 window。这意味着我们实际上得到了 (t1-01:00) <= t1 < (t1 + 01:00)。例如。 A 的 window 是 01:0000 - 02.9999... 这就是为什么 3 出现在 03:00 不包括在内。

这可以通过简单地在我们的 y 时间

上添加一个刻度来固定为包含在内
var y = TimeSpan.FromMilliseconds(100).Add(TimeSpan.FromTicks(1));