订阅/重新订阅一个 Observable 流
Subscribe / Resubscribe to an Observable Stream
我 运行 遇到了一个问题,我想在谓词为真时订阅一个可观察流,并在谓词为假时停止订阅。当谓词在未来某个时刻再次为真时,它应该重新订阅可观察流。
用例:
我将我的可观察流作为输入 (IObservable<IList<LogEntity>> items
) 如果我无法将日志实体插入数据库,它应该取消订阅该流,并且当数据库备份时 运行ning 它应该自动订阅流(基于 属性 IsSubscribed
)并开始插入数据。
我自己的尝试:
我已经尝试了以下没有的方法:
var groups = from dataItem in items.SelectMany(o => o.GroupBy(i => i.EntityType))
where dataItem.Any()
select new {Type = dataItem.Key, List = dataItem.Select(o => o)};
groups
.TakeWhile(o => IsSubscribed)
.SubscribeOn(_scheduler)
.Repeat()
.Subscribe(o => Insert(o.Type, o.List));
基于属性IsSubscribed
,我想直播订阅和退订。当 TakeWhile
为真时 OnCompleted
被调用,当 Subscribe
之后将不起作用。 旁注:这是一个冷的可观察流
问题:
如何创建一个可观察流,我可以在其中订阅和取消订阅任意多次(有点像 C# 中的事件处理程序)
感谢您提前提供帮助
你要的是添加
团体
.Delay(group.SelectMany(WaitForDatabaseUp))
public async Task WaitForDatabaseUp()
{
//If IsSubscribed continue execution
if(IsSubscribed) return;
//Else wait until IsSubscribed == true
await this.ObservableForProperty(x => x.IsSubscribed, skipInitial: false)
.Value()
.Where(isSubscribed => isSubscribed)
.Take(1);
}
使用您最喜欢的框架将 INPC 转换为 Observable,您可以在其中看到 ObserveProperty()
基本上我们将内联一个仅在 IsSubscribed == true
时 returns 的任务。然后将该 Task 转换为 Observable,以便与 Rx 兼容。
看起来像是一个重复的问题。
但是,从Pause and Resume Subscription on cold IObservable中提取代码,可以调整为
var subscription = Observable.Create<IObservable<YourType>>(o =>
{
var current = groups.Replay();
var connection = new SerialDisposable();
connection.Disposable = current.Connect();
return IsSubscribed
.DistinctUntilChanged()
.Select(isRunning =>
{
if (isRunning)
{
//Return the current replayed values.
return current;
}
else
{
//Disconnect and replace current.
current = source.Replay();
connection.Disposable = current.Connect();
//yield silence until the next time we resume.
return Observable.Never<YourType>();
}
})
.Subscribe(o);
})
.Switch()
.Subscribe(o => Insert(o.Type, o.List));
你可以看到 Matt Barrett(和我)谈论它 here。我建议观看整个视频(可能是 2 倍速)以了解完整的上下文。
我 运行 遇到了一个问题,我想在谓词为真时订阅一个可观察流,并在谓词为假时停止订阅。当谓词在未来某个时刻再次为真时,它应该重新订阅可观察流。
用例:
我将我的可观察流作为输入 (IObservable<IList<LogEntity>> items
) 如果我无法将日志实体插入数据库,它应该取消订阅该流,并且当数据库备份时 运行ning 它应该自动订阅流(基于 属性 IsSubscribed
)并开始插入数据。
我自己的尝试:
我已经尝试了以下没有的方法:
var groups = from dataItem in items.SelectMany(o => o.GroupBy(i => i.EntityType))
where dataItem.Any()
select new {Type = dataItem.Key, List = dataItem.Select(o => o)};
groups
.TakeWhile(o => IsSubscribed)
.SubscribeOn(_scheduler)
.Repeat()
.Subscribe(o => Insert(o.Type, o.List));
基于属性IsSubscribed
,我想直播订阅和退订。当 TakeWhile
为真时 OnCompleted
被调用,当 Subscribe
之后将不起作用。 旁注:这是一个冷的可观察流
问题:
如何创建一个可观察流,我可以在其中订阅和取消订阅任意多次(有点像 C# 中的事件处理程序)
感谢您提前提供帮助
你要的是添加 团体 .Delay(group.SelectMany(WaitForDatabaseUp))
public async Task WaitForDatabaseUp()
{
//If IsSubscribed continue execution
if(IsSubscribed) return;
//Else wait until IsSubscribed == true
await this.ObservableForProperty(x => x.IsSubscribed, skipInitial: false)
.Value()
.Where(isSubscribed => isSubscribed)
.Take(1);
}
使用您最喜欢的框架将 INPC 转换为 Observable,您可以在其中看到 ObserveProperty()
基本上我们将内联一个仅在 IsSubscribed == true
时 returns 的任务。然后将该 Task 转换为 Observable,以便与 Rx 兼容。
看起来像是一个重复的问题。
但是,从Pause and Resume Subscription on cold IObservable中提取代码,可以调整为
var subscription = Observable.Create<IObservable<YourType>>(o =>
{
var current = groups.Replay();
var connection = new SerialDisposable();
connection.Disposable = current.Connect();
return IsSubscribed
.DistinctUntilChanged()
.Select(isRunning =>
{
if (isRunning)
{
//Return the current replayed values.
return current;
}
else
{
//Disconnect and replace current.
current = source.Replay();
connection.Disposable = current.Connect();
//yield silence until the next time we resume.
return Observable.Never<YourType>();
}
})
.Subscribe(o);
})
.Switch()
.Subscribe(o => Insert(o.Type, o.List));
你可以看到 Matt Barrett(和我)谈论它 here。我建议观看整个视频(可能是 2 倍速)以了解完整的上下文。