在 RX 中切换流:Sodium 相当于 RX 中的合并和切换

Switching streams in RX: Sodium's equivalent of merge and switch in RX

talk 第 31 分钟解释的电视频道问题如何通过 RX 解决?

Rx表达的问题如下:

有两个电视频道(channel1channel2)传输图像流,外加 fuzz 流,表示无频道或白噪声。

有两个按钮在按下时发送事件 eButton1eButton2

按下这些按钮应该会导致相应的频道被发送到屏幕。

每次按下按钮都应该投射(映射)到相应的频道,然后所有频道组合成一个选择流,作为以 fuzz 流开始的流。最后,切换运算符将选定的流发送到 screen.

问题

Sodiums 在 RX 中的切换和合并相当于什么?

是否可以用纯高阶函数求解? IE。不使用闭包?我不明白这怎么可能。

Switch and Merge 两者都存在于核心 Rx 库中,很高兴幻灯片中的代码实际上几乎逐字逐句地翻译成 Rx。

Switch 运算符作用于流的流 - 在 Rx 中,这是一种类型 IObservable<IObservable<T>>

Switch 将这串流变平,只将最近的流发送到它的输出,所以你最终得到一个 IObservable<T>

请参阅下面的 C# 示例。我已经尽可能重复使用了演讲中的变量名,所以这应该很容易理解。

唯一(非常轻微)不同的是 hold 函数被替换为等效的 Rx StartWith

包含 nuget 包 Rx-Main 和 运行 作为控制台应用程序。该代码订阅 screen 流并开始将帧从 "Fuzz" 通道渲染到控制台。它会提示您输入频道号。输入 1 或 2,您将看到输出切换到相应通道的帧。

// helper method to create channels
private static IObservable<string> CreateChannelStream(
    string name, CompositeDisposable disposables)
{
    // this hacks together a demo channel stream -
    // a stream of "frames" for the channel
    // for simplicity rather than using images, I use a string
    // message for each frame
    // how it works isn't important, just know you'll get a
    // message event every second
    var channel = Observable.Interval(TimeSpan.FromSeconds(1))
                            .Select(x => name + " Frame: " + x)
                            .Publish();
    disposables.Add(channel.Connect());
    return channel;
}

public static void Main()
{       
    // for cleaning up the hot channel streams
    var disposable = new CompositeDisposable();

    // some channels
    var fuzz = CreateChannelStream("Fuzz", disposable);
    var channel1 = CreateChannelStream("Channel1", disposable);
    var channel2 = CreateChannelStream("Channel2", disposable);

    // the button press event streams
    var eButton1 = new Subject<Unit>();
    var eButton2 = new Subject<Unit>();

    // the button presses are projected to
    // the respective channel streams
    // note, you could obtain the channel via a function call here
    // if you wanted to - to keep it close to the slides I'm not.
    var eChan1 = eButton1.Select(_ => channel1);
    var eChan2 = eButton2.Select(_ => channel2);

    // create the selection "stream of streams"
    // an IObservable<IObservable<string>> here
    // that starts with "fuzz"
    var sel = Observable.Merge(eChan1, eChan2).StartWith(fuzz);

    // flatten and select the most recent stream with Switch
    var screen = sel.Switch();

    // subscribe to the screen and print the frames
    // it will start with "fuzz"
    disposable.Add(screen.Subscribe(Console.WriteLine));

    bool quit = false;

    // a little test loop
    // entering 1 or 2 will switch
    // to that channel
    while(!quit)
    {
        var chan = Console.ReadLine();
        switch (chan.ToUpper())
        {
            case "1":
                // raise a button 1 event
                eButton1.OnNext(Unit.Default);
                break;
            case "2":
                // raise a button 2 event
                eButton2.OnNext(Unit.Default);
                break;  
            case "Q":
                quit = true;
                break;                
        }
    }         

    disposable.Dispose();
}

这样的做法对吗:

IObservable<System.Drawing.Image> fuzz = ...
IObservable<System.Drawing.Image> channel1 = ...
IObservable<System.Drawing.Image> channel2 = ...

IObservable<string> eButton1 = ... // produces string "eButton1" when clicked
IObservable<string> eButton2 = ... // produces string "eButton2" when clicked

var output =
(
    from button in eButton1.Merge(eButton2).StartWith("")
    select
        button == "eButton1"
            ? channel1
            : (button == "eButton2"
                ? channel2 
                : fuzz)
).Switch();