订阅/重新订阅一个 Observable 流

Subscribe / Resubscribe to an Observable Stream

我 运行 遇到了一个问题,我想在谓词为真时订阅一个可观察流,并在谓词为假时停止订阅。当谓词在未来某个时刻再次为真时,它应该重新订阅可观察流。

用例:

我将我的可观察流作为输入 (IObservable<IList<LogEntity>> items) 如果我无法将日志实体插入数据库,它应该取消订阅该流,并且当数据库备份时 运行ning 它应该自动订阅流(基于 属性 IsSubscribed)并开始插入数据。

我自己的尝试:

我已经尝试了以下没有的方法:

var groups = from dataItem in items.SelectMany(o => o.GroupBy(i => i.EntityType))
    where dataItem.Any()
    select new {Type = dataItem.Key, List = dataItem.Select(o => o)};

groups
    .TakeWhile(o => IsSubscribed)
    .SubscribeOn(_scheduler)
    .Repeat()
    .Subscribe(o => Insert(o.Type, o.List));

基于属性IsSubscribed,我想直播订阅和退订。当 TakeWhile 为真时 OnCompleted 被调用,当 Subscribe 之后将不起作用。 旁注:这是一个冷的可观察流

问题:

如何创建一个可观察流,我可以在其中订阅和取消订阅任意多次(有点像 C# 中的事件处理程序)

感谢您提前提供帮助

你要的是添加 团体 .Delay(group.SelectMany(WaitForDatabaseUp))

public async Task WaitForDatabaseUp()
{
    //If IsSubscribed continue execution
    if(IsSubscribed) return;
    //Else wait until IsSubscribed == true
    await this.ObservableForProperty(x => x.IsSubscribed, skipInitial: false)
                       .Value()
                       .Where(isSubscribed => isSubscribed)
                       .Take(1);
}

使用您最喜欢的框架将 INPC 转换为 Observable,您可以在其中看到 ObserveProperty()

基本上我们将内联一个仅在 IsSubscribed == true 时 returns 的任务。然后将该 Task 转换为 Observable,以便与 Rx 兼容。

看起来像是一个重复的问题。

但是,从Pause and Resume Subscription on cold IObservable中提取代码,可以调整为

var subscription = Observable.Create<IObservable<YourType>>(o =>
{
    var current = groups.Replay();
    var connection = new SerialDisposable();
    connection.Disposable = current.Connect();

    return IsSubscribed
        .DistinctUntilChanged()
        .Select(isRunning =>
        {
            if (isRunning)
            {
                //Return the current replayed values.
                return current;
            }
            else
            {
                //Disconnect and replace current.
                current = source.Replay();
                connection.Disposable = current.Connect();
                //yield silence until the next time we resume.
                return Observable.Never<YourType>();
            }

        })
        .Subscribe(o);
})
.Switch()
.Subscribe(o => Insert(o.Type, o.List));

你可以看到 Matt Barrett(和我)谈论它 here。我建议观看整个视频(可能是 2 倍速)以了解完整的上下文。