如何将 Observable 与状态配对?

How to Pair an Observable with State?

我正在学习 Rx.Net,我正在构建一个以多个间隔运行的调度程序。我的想法是我有一组间隔和一组命令,我将它们组合在一起并订阅合并结果。每个命令都将与其各自的间隔相关联,并在与其关联定义的间隔(节奏)上执行。

虽然我能够将间隔合并在一起,但我费了好大的劲才想出如何将此命令(状态)传递给订阅。 我确实看到了 Scan 函数,但这看起来是聚合而不是 couple/tuple。这是我的代码:

var intervals = new[]
                {
                    TimeSpan.FromSeconds(30),
                    TimeSpan.FromSeconds(60),
                    TimeSpan.FromSeconds(90)
                };
var commands = new Action[]
                {
                    () => Console.WriteLine("30 Seconds!"),
                    () => Console.WriteLine("60 Seconds!"),
                    () => Console.WriteLine("90 Seconds!")
                };

intervals.Zip(commands, ValueTuple.Create)
         .Select(x => Observable.Interval(x.Item1)) // <-- Need magic here. :)
         .Merge()
         .Subscribe(x =>
                    {
                        x.Item2(); // Broken, x is a long.
                    });

Rx 中有没有一种方法可以实现我在这里想要实现的目标?类似于 Scan 但不是累加器,而是元组(如果这是一个词)。

如果您希望以 30、60、90 秒的间隔重复此操作:

intervals.Zip(commands, ValueTuple.Create)
         .Select(x => Observable.Interval(x.Item1).Select(_ => x))
         .Merge()
         .Subscribe(x =>
                    {
                        x.Item2(); // Broken, x is a long.
                    });

如果您只想触发一次操作:

intervals.Zip(commands, ValueTuple.Create)
         .Select(x => Observable.Delay(Observable.Return(x), x.Item1)) 
         .Merge()
         .Subscribe(x =>
                    {
                        x.Item2(); // Broken, x is a long.
                });

我将为您需要的查询使用稍微不同的版本。

现在,您代码的查询部分(.Merge() 之前的位(包括 .Merge() 仅选择命令 - 并且您依赖订阅者执行命令。您信任订阅者为您执行命令。在某些情况下这可能是正确的逻辑,但并非全部。

我的想法是,当您订阅可观察对象时,无论订阅者做什么,您都希望确保命令是 运行。订阅者应该只对产生值的可观察对象做出反应,如果它认为应该的话。

这是代码:

IObservable<Unit> query =
    intervals
        .Zip(commands, (Interval, Command) => new { Interval, Command })
        .Select(x =>
            Observable
                .Interval(x.Interval)
                .SelectMany(y =>
                    Observable
                        .Start(() => x.Command())))
        .Merge();

每当观察者订阅时,都会确保命令 运行。观察者可以报告它已完成。

最基本的订阅现在只有 IDisposable subscription = query.Subscribe();。简单。