在 Rx 中,为什么什么时候给我过时的元素

In Rx, why does When give me outdated elements

我有一个包含多种通知类型的流。一种通知类型包含有关当前文件的信息并连续发送。当用户单击按钮时会发出另一种类型。两个通知都在一个流中。

当用户单击我想对当前文件执行某些操作的按钮时。我将源流分成两个流,并尝试将 "button click" 与最新的 "current file" 通知结合起来。

static IObservable<DoSomething> DoSomethingWithFile(IObservable<object> stream)
{
  var buttonClick = stream.OfType<ButtonClick>();
  var file = stream.OfType<CurrentFile>();

  var match = buttonClick
    .And(file)
    .Then((command, f) => new DoSomething());

  return Observable.When(match);
}

这是想要的大理石图:

File    1-1-1-1-2-2-2-3-3-3-3
Button  ----x-----------x----

Desired ----1-----------3----
            x           x

该代码通常有效,但如果当前文件信息发生变化,则只能执行几次。我得到的不是上面的弹珠图:

File    1-1-1-1-2-2-2-3-3-3-3
Button  ----x-----------x----

Actual  ----1-----------2----
            x           x

CombineLatest 在这种情况下不起作用,因为在每个新文件上它都会向目标序列发出通知,尽管用户没有单击按钮。

我知道这个问题很笼统,但我们当然是在谈论 a real project :-)

所以 AndPlanPattern 对我来说都是新的。非常酷的东西。根据我对 And 的实验,它似乎缓冲了快速可观察值,直到慢速可观察值发出一个值,此时它 returns 来自快速可观察值的第一个值和来自慢速可观察值的最新值。那可以解释您所看到的行为。我在 MSDN 上找不到任何详细解释 And 工作原理的文档,不幸的是,这可以证实我所看到的。

我想这会给你想要的结果:

static IObservable<DoSomething> DoSomethingWithFile(IObservable<object> stream)
{
    var buttonClick = stream.OfType<ButtonClick>();
    var file = stream.OfType<CurrentFile>();

    return
        file
        .Select(f =>
            buttonClick
            .Select(b => new { File = f, ButtonClick = b })
            //.Take(1)
            // Include this if you only want to register 
            // one button click per file change
            )
        .Switch()
        .Select(x => new DoSomething());
}

Zip、And 或 CombineLatest 是此处要查看的正确运算符。您对文件流使用 DistinctUntilLatest 的更改是一个好的开始。但是,在上面的示例中,即使使用 DistinctUntilChanged,您仍然会收到关于 (2, x) 的通知,即使使用 DistinctUntilChanged。

Zip 将为您推送两个流的每个组合不同值的通知。所以你不能单独使用 Zip 来跳过文件 2 的通知。并且会有相同的行为。 CombineLatest 也会给你通知,其中有 2。

如果你想要你上面画的弹珠图,当你点击按钮时你只从文件流中获取最新的值,你可以使用你这样做的方法:

file.CombineLatest(buttonClick, (file, button) => new { file, button })
    .DistinctUntilChanged(data => data.button)

考虑这个例子,有一个慢流和一个快流。每次慢流滴答时,我们都从快流中获取最新值:

var fast = Observable.Interval(TimeSpan.FromSeconds(1));
var slow = Observable.Interval(TimeSpan.FromSeconds(1.5));

fast.CombineLatest(slow, (fastValue, slowValue) => new { fastValue, slowValue })
    .DistinctUntilChanged(data => data.slowValue)
    .Dump();

如果您注释掉 DistinctUntilChanged,您会看到它会在每次快速源滴答时产生一个值。

RxJS 有一个运算符可以为我们做这件事,叫做 withLatestFrom,但不幸的是,它不在 .Net 版本中。您可以执行 button.withLatestFrom(file) 并从那里开始。