使用 ReactiveUI 通过 ToggleSwitch 控制 (start/stop) IObservable

Controlling (start/stop) IObservable through ToggleSwitch using ReactiveUI

我正在尝试挂钩流 (IObservable),以便在 UWP 项目中通过 ToggleSwitch 进行控制。期望是当开关处于 On 状态时我开始流式传输并在它处于 Off 状态时停止。

所以想到的是 1. 创建两个命令,一个启动流,另一个停止流。 2.创建两个Observable,监听开关状态,条件合适时InvokeCommand

视图模型

public class MainPageViewModel : ViewModelBase
{
    public ReactiveCommand<Unit, (long, float)> StreamCommand { get; }
    public ReactiveCommand<Unit, Unit> StopCommand { get; }
    public IObservable<(long, float)> FlowStream { get; set; }

    private bool _isStreamOn;

    public bool IsStreamOn
    {
        get => _isStreamOn;
        set => this.RaiseAndSetIfChanged(ref _isStreamOn, value);
    }

    public MainPageViewModel()
    {
        var stream = GetStream();

        var canSwitchOn = this.WhenAnyValue(x => x.IsStreamOn);
        var canSwitchOff = this.WhenAnyValue(x => x.IsStreamOn, isOn => isOn != true);

        FlowStream = StreamCommand = ReactiveCommand.CreateFromObservable(
            () =>
                {
                    stream.Start();
                    return Observable.FromEventPattern<StreamDataEventArgs<(long, INumeric, INumeric, INumeric)>>(
                            h => stream.DataAvailable += h,
                            h => stream.DataAvailable -= h)
                        .SelectMany(e => e.EventArgs.Data)
                        .Select(item => item));
                }, canSwitchOn);

        StopCommand = ReactiveCommand.Create(
            () =>
            {
                stream.Stop();
                IsStreamOn = false;
            }, canSwitchOff);

        canSwitchOff.InvokeCommand(StopCommand);
        canSwitchOn.InvokeCommand(StreamCommand);
    }

}

查看

public sealed partial class MainPage : Page, IViewFor<MainPageViewModel>
{
    public MainPage()
    {
        InitializeComponent();
        NavigationCacheMode = Windows.UI.Xaml.Navigation.NavigationCacheMode.Enabled;
        ViewModel = new MainPageViewModel();

        this.WhenActivated(subscription =>
        {
            subscription(this.OneWayBind(this.ViewModel,
                vm => vm.StreamCommand,
                v => v.chart.SeriesCollection[0].Stream)); // Chart take care of displaying data

            subscription(this.Bind(this.ViewModel,
                vm => vm.IsStreamOn,
                v => v.streamToggle.IsOn));
        });
    }

    object IViewFor.ViewModel
    {
        get { return ViewModel; }
        set { ViewModel = (MainPageViewModel)value; }
    }

    public MainPageViewModel ViewModel
    {
        get { return (MainPageViewModel)GetValue(ViewModelProperty); }
        set { SetValue(ViewModelProperty, value); }
    }

    public static readonly DependencyProperty ViewModelProperty =
        DependencyProperty.Register("ViewModel", typeof(MainPageViewModel), typeof(MainPage), null);
}

但是,InvokeCommand 失败了,因为它需要 ReactiveCommands 获取布尔值,而不是 Unit。 知道如何在满足特定条件时调用命令吗?

如果您想根据 IObservable<bool> IsStreamOn 可观察对象打开和关闭流 (IObservable<(long, float)> FlowStream),那么您可以这样做:

IObservable<(long, float)> outputStream =
    IsStreamOn
        .Select(flag => flag ? FlowStream : Observable.Never<(long, float)>())
        .Switch();

所以每次 IsStreamOn 产生一个 true 你开始从 FlowStream 获取值,否则值停止。

这假设 FlowStream 很热。如果没有,请执行此操作:

IObservable<(long, float)> outputStream =
    FlowStream
        .Publish(fs =>
            IsStreamOn
                .Select(flag => flag ? fs : Observable.Never<(long, float)>())
                .Switch());

这是一个简单的测试:

void Main()
{
    IObservable<long> outputStream =
        FlowStream
            .Publish(fs =>
                IsStreamOn
                    .Select(flag => flag ? fs : Observable.Never<long>())
                    .Switch());

    using (outputStream.Subscribe(Console.WriteLine))
    {
        IsStreamOn.OnNext(true);
        Thread.Sleep(TimeSpan.FromSeconds(2.5));
        IsStreamOn.OnNext(false);
        Thread.Sleep(TimeSpan.FromSeconds(3.0));
        IsStreamOn.OnNext(true);
        Thread.Sleep(TimeSpan.FromSeconds(3.0));
    }

}

IObservable<long> FlowStream = Observable.Interval(TimeSpan.FromSeconds(1.0));
Subject<bool> IsStreamOn = new Subject<bool>();

这会产生:

0
1
5
6
7

鉴于评论实际上是在调用 .Start().Stop() 然后尝试这样的事情:

IObservable<(long, float)> outputStream =
    Observable
        .Create<(long, float)>(o =>
        {
            var stream = GetStream();
            return 
                FlowStream
                    .Publish(fs =>
                        IsStreamOn
                            .Do(flag => { if (flag) stream.Start(); else stream.Stop(); })
                            .Select(flag => flag ? fs : Observable.Never<(long, float)>())
                            .Switch())
                    .Subscribe(o);
        });

在这些使用您的可观察对象的场景中,我倾向于做

var canSwitchOn = this.WhenAnyValue(x => x.IsStreamOn).Select(_ => Unit.Default);

这将允许您不将 bool 传递给命令。

哦,如果您想在正确的条件下触发命令,您可能还需要一个 where() 子句。

例如

var switchOn = this.WhenAnyValue(x => x.IsStreamOn).Where(x => x).Select(_ => Unit.Default);
var switchOff = this.WhenAnyValue(x => x.IsStreamOn).Where(x => !x).Select(_ => Unit.Default);