如何限制事件 RX?

How to throttle events RX?

我正在尝试限制当值已更改时由滑块触发的事件我想限制到 1 秒,它每秒打印一次,但打印的数据不正确集合中的所有值都在同时打印,但我想每秒打印一次该唯一数据类型的最后一个值,结果发布在下面。

[完全可重现的代码] XAML:

<Grid>
    <TextBox x:Name="txtbilly" Text="{Binding Position, Mode=TwoWay , UpdateSourceTrigger=PropertyChanged}" Margin="0,0,0,382" ></TextBox>
    <TextBox x:Name="txtbob" Text="{Binding Position, Mode=TwoWay , UpdateSourceTrigger=PropertyChanged}" Margin="0,162,0,233" ></TextBox>
    <Slider x:Name="slbilly"  Value="{Binding Position, Mode=TwoWay , UpdateSourceTrigger=PropertyChanged}" Margin="0,57,0,306" GotMouseCapture="slbilly_GotMouseCapture"/>
    <Slider x:Name="slbob"  Value="{Binding Position, Mode=TwoWay, UpdateSourceTrigger=PropertyChanged}" Margin="0,206,0,171" GotMouseCapture="slbob_GotMouseCapture" />
</Grid>

后面的代码:

public partial class MainWindow : Window
{
    Student billy = new Student("Unique1", 0.0f);
    Student Bob = new Student("Unique2", 0.0f);
    public MainWindow()
    {
        InitializeComponent();
        txtbilly.DataContext = billy;
        slbilly.DataContext = billy;
        billy.PropertyChanged += Students_PropertyChanged;

        txtbob.DataContext = Bob;
        slbob.DataContext = Bob;
        Bob.PropertyChanged += Students_PropertyChanged;

    }
    static IList<Student> studentsList = new List<Student>();
    IObservable<Student> observable;
    private void Students_PropertyChanged(object sender, System.ComponentModel.PropertyChangedEventArgs e)
    {
        var student = (Student)sender;
        studentsList.Add(student);
        Debug.Print("List COunt ==>>>" + studentsList.Count.ToString() + "\n");

        observable = studentsList.ToObservable(Scheduler.Default);

        observable.Throttle(TimeSpan.FromSeconds(1), Scheduler.Default);
    }

    private void slbilly_GotMouseCapture(object sender, MouseEventArgs e)
    {
        observable?.Subscribe(i => Debug.Print("{0}\nPos {1}\n Time Received {2}\nOBS Count{3}", i?.ID?.ToString(), i.Position.ToString(), DateTime.Now.ToString(), observable.Count<Student>().ToString()));
    }


    private void slbob_GotMouseCapture(object sender, MouseEventArgs e)
    {

        observable?.Subscribe(i => Debug.Print("{0}\nPos {1}\n Time Received {2}\nOBS Count{3}", i?.ID?.ToString(), i.Position.ToString(), DateTime.Now.ToString(), observable.Count<Student>().ToString()));
    }
}

型号:

public class Student : INotifyPropertyChanged
{
    private string _ID;
    public string ID
    {
        get
        {
            return _ID;
        }
        set
        {
            _ID = value;
            OnPropertyChanged(new PropertyChangedEventArgs("ID"));
        }
    }
    private float _Position;
    public float Position
    {
        get
        {
            return _Position;
        }
        set
        {
            _Position = value;
            OnPropertyChanged(new PropertyChangedEventArgs("Position"));
        }
    }
    public Student(string id, float position)
    {

        this.Position = position;
        this.ID = id;

    }

    public event PropertyChangedEventHandler PropertyChanged;
    public void OnPropertyChanged(PropertyChangedEventArgs e)
    {
        if (PropertyChanged != null)
        {
            PropertyChanged(this, e);
        }
    }
}

结果:

  1. Unique1 Pos 0.06468305 Time Received 18/04/2022 00:16:05

  2. Unique1 Pos 0.06468305 Time Received 18/04/2022 00:16:05 OBS

  3. Unique1 Pos 0.06468305 Time Received 18/04/2022 00:16:05 OBS

  4. Unique1 Pos 0.06468305 Time Received 18/04/2022 00:16:05 OBS

  5. Unique1 Pos 0.06468305 Time Received 18/04/2022 00:16:05 OBS

预期:如果用户从 0.0 开始并在 1 秒内从 0.0 拖到 300.98,然后拖到 200.45 2 秒:

Unique1 Pos 0.0 Time Received 18/04/2022 00:16:05

Unique1 Pos 300.98 Time Received 18/04/2022 00:16:06

Unique1 Pos 200.45 Time Received 18/04/2022 00:16:07

问题:

  1. studentsList counts keeps growing and not reset by the observable(每次使用从控件中释放鼠标时,它应该重置列表。

  2. InvalidOperationException: 'Collection was modified; enumeration operation may not execute.'

  3. 一旦我从拖动动作中释放鼠标,Debug.Print 只会得到 printed/execute

@Enigmativity 回答解决了我的第四个已知问题。

我是 Rx 技术的新手,所以我不确定我是否以正确的方式实施它。

基本上,如果更改的值来自同一个控件,我想限制它,例如在滑块控件上当用户单击旋钮拖动时开始节流一旦用户释放鼠标单击停止节流

你有这条线:

observable.Throttle(TimeSpan.FromSeconds(1), Scheduler.Default);

应该是:

observable = observable.Throttle(TimeSpan.FromSeconds(1), Scheduler.Default);

Rx 中的运算符很流畅。您从 Throttle 运算符获得了一个新的可观察对象。没有分配 observable 保持不变。

为了向您展示您可能需要什么才能让您的 observable 工作,它应该看起来像这样:

var observable =
    Observable
        .Merge(
            Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(h => billy.PropertyChanged += h, h => billy.PropertyChanged += h).Select(x => billy),
            Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(h => Bob.PropertyChanged += h, h => Bob.PropertyChanged += h).Select(x => Bob))
        .Throttle(TimeSpan.FromSeconds(5.0))
        .ObserveOn(SynchronizationContext.Current)
        .Do(s =>
        {
            studentsList.Add(s);
            Debug.Print($"List Count ==>>>{studentsList.Count}{Environment.NewLine}");
        })
        .Select(s => Observable.FromAsync(() => ExpensiveHttpCall(s)).Select(u => new { Student = s, Result = u }))
        .Switch();

和订阅,像这样:

_subscription =
    observable
    .ObserveOn(SynchronizationContext.Current)
        .Subscribe(x =>
        {
            Debug.Print($"{x.Student.ID}{Environment.NewLine}Pos {x.Student.Position}{Environment.NewLine}Time Received {DateTime.Now.ToString()}{Environment.NewLine}List Count{studentsList.Count()}");
        });

我添加了这段代码来模拟 long-running HTTP 调用:

    private async Task<Unit> ExpensiveHttpCall(Student student)
    {
        await Task.Delay(TimeSpan.FromSeconds(5.0));
        return Unit.Default;
    }

现在我不知道你想用这个查询做什么,但这应该有助于你朝着正确的方向前进。

Rx 的关键是尝试将您的查询简化为您订阅一次的单个查询。它让生活更轻松。

完整代码如下:

public partial class MainWindow : Window
{
    Student billy = new Student("Unique1", 0.0f);
    Student Bob = new Student("Unique2", 0.0f);
    public MainWindow()
    {
        InitializeComponent();
        txtbilly.DataContext = billy;
        slbilly.DataContext = billy;

        txtbob.DataContext = Bob;
        slbob.DataContext = Bob;

        var observable =
            Observable
                .Merge(
                    Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(h => billy.PropertyChanged += h, h => billy.PropertyChanged += h).Select(x => billy),
                    Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(h => Bob.PropertyChanged += h, h => Bob.PropertyChanged += h).Select(x => Bob))
                .Throttle(TimeSpan.FromSeconds(5.0))
                .ObserveOn(SynchronizationContext.Current)
                .Do(s =>
                {
                    studentsList.Add(s);
                    Debug.Print($"List Count ==>>>{studentsList.Count}{Environment.NewLine}");
                })
                .Select(s => Observable.FromAsync(() => ExpensiveHttpCall(s)).Select(u => new { Student = s, Result = u }))
                .Switch();

        _subscription =
            observable
            .ObserveOn(SynchronizationContext.Current)
                .Subscribe(x =>
                {
                    Debug.Print($"{x.Student.ID}{Environment.NewLine}Pos {x.Student.Position}{Environment.NewLine}Time Received {DateTime.Now.ToString()}{Environment.NewLine}List Count{studentsList.Count()}");
                });
    }

    private IDisposable? _subscription = null;

    private async Task<Unit> ExpensiveHttpCall(Student student)
    {
        await Task.Delay(TimeSpan.FromSeconds(5.0));
        return Unit.Default;
    }

    static IList<Student> studentsList = new List<Student>();
}