仅发射一个可观察值,而其他发射一些值

Emit one observable only while other is emitting some values

我正在编写一种监控指定远程机器的监控工具。 它会检索机器是否启动,IIS 是否 运行 以及更多参数。
机器 up/down 状态可以每隔 3 分钟监控一次。 并且可以每隔 ~1 分钟监视一次 IIS 状态。
如果其中一些参数发生变化,它将记录更改 (DistinctUntilChanged),如果它保持不变,则不会执行任何操作。

我为机器状态创建了一个可观察对象,为 IIS 状态创建了一个。 显然,如果机器停机,则无需监视任何其他远程参数(无需调用 GetIisState)。
因此,我尝试使用 SkipUntilTakeUntil – 尝试在机器停机时暂停 IisObservable。 但它并没有像我希望的那样工作。

这是我的机器状态可观察值:

machineStateObservable = Observable.Interval(TimeSpan.FromSeconds(180))
    .StartWith(-1)
    .Select(async it => new {Order = it, State = await GetMachineStateAsync(machine)})
    .Switch()
    .Do(it =>
    {
        if (it.Order == -1) // write initial state 1st time
        {
            var state = it.State ? "up" : "down";
            var msg = $"{machine.Name}: Machine initial state: {state}";
            Log.Info(msg);
        }
    })
    .Select(it => it.State);

machineStateObservable
    .DistinctUntilChanged()
    .Buffer(2, 1).Where(it => it.Count == 2)
    .Subscribe(it => Log.Info($"{machine.Name}: MachineState got changed from {it[0]} to: {it[1]}")
        , ex => Log.Error(ex, "Unhandled exception!"));

我应该如何定义 IisObservable 它不会发出通知(因此不会调用 GetIisState)而 machineStateObservable 表示机器已关闭?

更新:
这是我在@Enigmativity 的帮助下得出的解决方案:

IisStateObservable = MachineStateObservable
                .Select(state => state
                    ? Observable.Interval(TimeSpan.FromSeconds(2)).StartWith(0)
                                .SelectMany(it => Readings.GetServiceStateAsync())
                    : Observable.Never<string>())
                .Switch()
                .Publish();

这是创建仅在另一个可观察对象产生 true 而不是在它产生 false.

时发出的可观察对象的基本模式
void Main()
{
    var states = new Subject<bool>();

    IObservable<int> query =
        states
            .Select(state => state
                ? Observable.FromAsync(() => GetStatusAsync())
                : Observable.Never<int>())
            .Switch();
}

public async Task<int> GetStatusAsync()
{
    return await Task.Factory.StartNew(() => 42);
}

这是我建议定期调用的代码。

void Main()
{
    var states = new Subject<bool>();

    IObservable<int> query =
    (
        from n in Observable.Interval(TimeSpan.FromMinutes(1.0))
        from ms in Observable.FromAsync(() => GetMachineStateAsync())
        select ms
            ? Observable.FromAsync(() => GetStatusAsync())
            : Observable.Never<int>()
    ).Switch();
}

public async Task<int> GetStatusAsync()
{
    return await Task.Factory.StartNew(() => 42);
}

public async Task<bool> GetMachineStateAsync()
{
    return await Task.Factory.StartNew(() => true);
}

或者,根据您建议的答案。

void Main()
{
    var states = new Subject<bool>();

    IObservable<int> query =
        states
            .Select(state => state
                ? Observable
                    .Interval(TimeSpan.FromSeconds(2.0))
                    .StartWith(-1L)
                    .SelectMany(n =>
                        Observable.FromAsync(() => GetStatusAsync()))
                : Observable.Never<int>())
            .Switch();
}

public async Task<int> GetStatusAsync()
{
    return await Task.Factory.StartNew(() => 42);
}