在 Rx 中,为什么什么时候给我过时的元素
In Rx, why does When give me outdated elements
我有一个包含多种通知类型的流。一种通知类型包含有关当前文件的信息并连续发送。当用户单击按钮时会发出另一种类型。两个通知都在一个流中。
当用户单击我想对当前文件执行某些操作的按钮时。我将源流分成两个流,并尝试将 "button click" 与最新的 "current file" 通知结合起来。
static IObservable<DoSomething> DoSomethingWithFile(IObservable<object> stream)
{
var buttonClick = stream.OfType<ButtonClick>();
var file = stream.OfType<CurrentFile>();
var match = buttonClick
.And(file)
.Then((command, f) => new DoSomething());
return Observable.When(match);
}
这是想要的大理石图:
File 1-1-1-1-2-2-2-3-3-3-3
Button ----x-----------x----
Desired ----1-----------3----
x x
该代码通常有效,但如果当前文件信息发生变化,则只能执行几次。我得到的不是上面的弹珠图:
File 1-1-1-1-2-2-2-3-3-3-3
Button ----x-----------x----
Actual ----1-----------2----
x x
CombineLatest
在这种情况下不起作用,因为在每个新文件上它都会向目标序列发出通知,尽管用户没有单击按钮。
我知道这个问题很笼统,但我们当然是在谈论 a real project :-)
所以 And
、Plan
和 Pattern
对我来说都是新的。非常酷的东西。根据我对 And
的实验,它似乎缓冲了快速可观察值,直到慢速可观察值发出一个值,此时它 returns 来自快速可观察值的第一个值和来自慢速可观察值的最新值。那可以解释您所看到的行为。我在 MSDN 上找不到任何详细解释 And
工作原理的文档,不幸的是,这可以证实我所看到的。
我想这会给你想要的结果:
static IObservable<DoSomething> DoSomethingWithFile(IObservable<object> stream)
{
var buttonClick = stream.OfType<ButtonClick>();
var file = stream.OfType<CurrentFile>();
return
file
.Select(f =>
buttonClick
.Select(b => new { File = f, ButtonClick = b })
//.Take(1)
// Include this if you only want to register
// one button click per file change
)
.Switch()
.Select(x => new DoSomething());
}
Zip、And 或 CombineLatest 是此处要查看的正确运算符。您对文件流使用 DistinctUntilLatest 的更改是一个好的开始。但是,在上面的示例中,即使使用 DistinctUntilChanged,您仍然会收到关于 (2, x) 的通知,即使使用 DistinctUntilChanged。
Zip 将为您推送两个流的每个组合不同值的通知。所以你不能单独使用 Zip 来跳过文件 2 的通知。并且会有相同的行为。 CombineLatest 也会给你通知,其中有 2。
如果你想要你上面画的弹珠图,当你点击按钮时你只从文件流中获取最新的值,你可以使用你这样做的方法:
file.CombineLatest(buttonClick, (file, button) => new { file, button })
.DistinctUntilChanged(data => data.button)
考虑这个例子,有一个慢流和一个快流。每次慢流滴答时,我们都从快流中获取最新值:
var fast = Observable.Interval(TimeSpan.FromSeconds(1));
var slow = Observable.Interval(TimeSpan.FromSeconds(1.5));
fast.CombineLatest(slow, (fastValue, slowValue) => new { fastValue, slowValue })
.DistinctUntilChanged(data => data.slowValue)
.Dump();
如果您注释掉 DistinctUntilChanged,您会看到它会在每次快速源滴答时产生一个值。
RxJS 有一个运算符可以为我们做这件事,叫做 withLatestFrom,但不幸的是,它不在 .Net 版本中。您可以执行 button.withLatestFrom(file) 并从那里开始。
我有一个包含多种通知类型的流。一种通知类型包含有关当前文件的信息并连续发送。当用户单击按钮时会发出另一种类型。两个通知都在一个流中。
当用户单击我想对当前文件执行某些操作的按钮时。我将源流分成两个流,并尝试将 "button click" 与最新的 "current file" 通知结合起来。
static IObservable<DoSomething> DoSomethingWithFile(IObservable<object> stream)
{
var buttonClick = stream.OfType<ButtonClick>();
var file = stream.OfType<CurrentFile>();
var match = buttonClick
.And(file)
.Then((command, f) => new DoSomething());
return Observable.When(match);
}
这是想要的大理石图:
File 1-1-1-1-2-2-2-3-3-3-3
Button ----x-----------x----
Desired ----1-----------3----
x x
该代码通常有效,但如果当前文件信息发生变化,则只能执行几次。我得到的不是上面的弹珠图:
File 1-1-1-1-2-2-2-3-3-3-3
Button ----x-----------x----
Actual ----1-----------2----
x x
CombineLatest
在这种情况下不起作用,因为在每个新文件上它都会向目标序列发出通知,尽管用户没有单击按钮。
我知道这个问题很笼统,但我们当然是在谈论 a real project :-)
所以 And
、Plan
和 Pattern
对我来说都是新的。非常酷的东西。根据我对 And
的实验,它似乎缓冲了快速可观察值,直到慢速可观察值发出一个值,此时它 returns 来自快速可观察值的第一个值和来自慢速可观察值的最新值。那可以解释您所看到的行为。我在 MSDN 上找不到任何详细解释 And
工作原理的文档,不幸的是,这可以证实我所看到的。
我想这会给你想要的结果:
static IObservable<DoSomething> DoSomethingWithFile(IObservable<object> stream)
{
var buttonClick = stream.OfType<ButtonClick>();
var file = stream.OfType<CurrentFile>();
return
file
.Select(f =>
buttonClick
.Select(b => new { File = f, ButtonClick = b })
//.Take(1)
// Include this if you only want to register
// one button click per file change
)
.Switch()
.Select(x => new DoSomething());
}
Zip、And 或 CombineLatest 是此处要查看的正确运算符。您对文件流使用 DistinctUntilLatest 的更改是一个好的开始。但是,在上面的示例中,即使使用 DistinctUntilChanged,您仍然会收到关于 (2, x) 的通知,即使使用 DistinctUntilChanged。
Zip 将为您推送两个流的每个组合不同值的通知。所以你不能单独使用 Zip 来跳过文件 2 的通知。并且会有相同的行为。 CombineLatest 也会给你通知,其中有 2。
如果你想要你上面画的弹珠图,当你点击按钮时你只从文件流中获取最新的值,你可以使用你这样做的方法:
file.CombineLatest(buttonClick, (file, button) => new { file, button })
.DistinctUntilChanged(data => data.button)
考虑这个例子,有一个慢流和一个快流。每次慢流滴答时,我们都从快流中获取最新值:
var fast = Observable.Interval(TimeSpan.FromSeconds(1));
var slow = Observable.Interval(TimeSpan.FromSeconds(1.5));
fast.CombineLatest(slow, (fastValue, slowValue) => new { fastValue, slowValue })
.DistinctUntilChanged(data => data.slowValue)
.Dump();
如果您注释掉 DistinctUntilChanged,您会看到它会在每次快速源滴答时产生一个值。
RxJS 有一个运算符可以为我们做这件事,叫做 withLatestFrom,但不幸的是,它不在 .Net 版本中。您可以执行 button.withLatestFrom(file) 并从那里开始。