如何将 Observable 与状态配对?
How to Pair an Observable with State?
我正在学习 Rx.Net,我正在构建一个以多个间隔运行的调度程序。我的想法是我有一组间隔和一组命令,我将它们组合在一起并订阅合并结果。每个命令都将与其各自的间隔相关联,并在与其关联定义的间隔(节奏)上执行。
虽然我能够将间隔合并在一起,但我费了好大的劲才想出如何将此命令(状态)传递给订阅。
我确实看到了 Scan
函数,但这看起来是聚合而不是 couple/tuple。这是我的代码:
var intervals = new[]
{
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(60),
TimeSpan.FromSeconds(90)
};
var commands = new Action[]
{
() => Console.WriteLine("30 Seconds!"),
() => Console.WriteLine("60 Seconds!"),
() => Console.WriteLine("90 Seconds!")
};
intervals.Zip(commands, ValueTuple.Create)
.Select(x => Observable.Interval(x.Item1)) // <-- Need magic here. :)
.Merge()
.Subscribe(x =>
{
x.Item2(); // Broken, x is a long.
});
Rx 中有没有一种方法可以实现我在这里想要实现的目标?类似于 Scan
但不是累加器,而是元组(如果这是一个词)。
如果您希望以 30、60、90 秒的间隔重复此操作:
intervals.Zip(commands, ValueTuple.Create)
.Select(x => Observable.Interval(x.Item1).Select(_ => x))
.Merge()
.Subscribe(x =>
{
x.Item2(); // Broken, x is a long.
});
如果您只想触发一次操作:
intervals.Zip(commands, ValueTuple.Create)
.Select(x => Observable.Delay(Observable.Return(x), x.Item1))
.Merge()
.Subscribe(x =>
{
x.Item2(); // Broken, x is a long.
});
我将为您需要的查询使用稍微不同的版本。
现在,您代码的查询部分(.Merge()
之前的位(包括 .Merge()
仅选择命令 - 并且您依赖订阅者执行命令。您信任订阅者为您执行命令。在某些情况下这可能是正确的逻辑,但并非全部。
我的想法是,当您订阅可观察对象时,无论订阅者做什么,您都希望确保命令是 运行。订阅者应该只对产生值的可观察对象做出反应,如果它认为应该的话。
这是代码:
IObservable<Unit> query =
intervals
.Zip(commands, (Interval, Command) => new { Interval, Command })
.Select(x =>
Observable
.Interval(x.Interval)
.SelectMany(y =>
Observable
.Start(() => x.Command())))
.Merge();
每当观察者订阅时,都会确保命令 运行。观察者可以报告它已完成。
最基本的订阅现在只有 IDisposable subscription = query.Subscribe();
。简单。
我正在学习 Rx.Net,我正在构建一个以多个间隔运行的调度程序。我的想法是我有一组间隔和一组命令,我将它们组合在一起并订阅合并结果。每个命令都将与其各自的间隔相关联,并在与其关联定义的间隔(节奏)上执行。
虽然我能够将间隔合并在一起,但我费了好大的劲才想出如何将此命令(状态)传递给订阅。
我确实看到了 Scan
函数,但这看起来是聚合而不是 couple/tuple。这是我的代码:
var intervals = new[]
{
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(60),
TimeSpan.FromSeconds(90)
};
var commands = new Action[]
{
() => Console.WriteLine("30 Seconds!"),
() => Console.WriteLine("60 Seconds!"),
() => Console.WriteLine("90 Seconds!")
};
intervals.Zip(commands, ValueTuple.Create)
.Select(x => Observable.Interval(x.Item1)) // <-- Need magic here. :)
.Merge()
.Subscribe(x =>
{
x.Item2(); // Broken, x is a long.
});
Rx 中有没有一种方法可以实现我在这里想要实现的目标?类似于 Scan
但不是累加器,而是元组(如果这是一个词)。
如果您希望以 30、60、90 秒的间隔重复此操作:
intervals.Zip(commands, ValueTuple.Create)
.Select(x => Observable.Interval(x.Item1).Select(_ => x))
.Merge()
.Subscribe(x =>
{
x.Item2(); // Broken, x is a long.
});
如果您只想触发一次操作:
intervals.Zip(commands, ValueTuple.Create)
.Select(x => Observable.Delay(Observable.Return(x), x.Item1))
.Merge()
.Subscribe(x =>
{
x.Item2(); // Broken, x is a long.
});
我将为您需要的查询使用稍微不同的版本。
现在,您代码的查询部分(.Merge()
之前的位(包括 .Merge()
仅选择命令 - 并且您依赖订阅者执行命令。您信任订阅者为您执行命令。在某些情况下这可能是正确的逻辑,但并非全部。
我的想法是,当您订阅可观察对象时,无论订阅者做什么,您都希望确保命令是 运行。订阅者应该只对产生值的可观察对象做出反应,如果它认为应该的话。
这是代码:
IObservable<Unit> query =
intervals
.Zip(commands, (Interval, Command) => new { Interval, Command })
.Select(x =>
Observable
.Interval(x.Interval)
.SelectMany(y =>
Observable
.Start(() => x.Command())))
.Merge();
每当观察者订阅时,都会确保命令 运行。观察者可以报告它已完成。
最基本的订阅现在只有 IDisposable subscription = query.Subscribe();
。简单。