反应式可观察订阅:停止订阅和续订订阅
Reactive Observable Subscription: Stop Subscription and Renew Subscription
我有一个来自 BlockCollection 的 Observable,我像队列一样使用它
IObservable<ProcessHoldTransactionData> GetObservable()
{
_queue.GetConsumingEnumerable().ToObservable(TaskPoolScheduler.Default);
}
并订阅他:
void StartSubscription()
{
_subscription = = GetObservable().Subscribe(
data => OnNextSubscribe(data),
ex => _logger.Error("Error"),
() => _logger.Warn("Complete"));
}
现在我有了另一个 Observable:
var timer = Observable.Interval(TimeSpan.FromSeconds(60));
_subscriptionTimer = timer.Subscribe(tick =>
{
OnTimerNextSubscribe();
});
我想在 OnTimerNextSubscribe 开始时停止订阅 _subscription 并在 OnTimerNextSubscribe 完成时更新它。
最好的做法是什么?
我应该处理 _subscription 并调用 StartSubscription()
基本上有两种选择:一种是处理然后重新启动,另一种是创建某种 on/off 可观察信号,然后相应地过滤 _subscription
:
void StartSubscription(Observable<bool> onOffSignal)
{
_subscription = = GetObservable()
.WithLatestFrom(onOffSignal, (s, b) => b ? Observable.Return(s) : Observable.Empty(s))
.Merge()
.Subscribe(
data => OnNextSubscribe(data),
ex => _logger.Error("Error"),
() => _logger.Warn("Complete")
);
}
我有一个来自 BlockCollection 的 Observable,我像队列一样使用它
IObservable<ProcessHoldTransactionData> GetObservable()
{
_queue.GetConsumingEnumerable().ToObservable(TaskPoolScheduler.Default);
}
并订阅他:
void StartSubscription()
{
_subscription = = GetObservable().Subscribe(
data => OnNextSubscribe(data),
ex => _logger.Error("Error"),
() => _logger.Warn("Complete"));
}
现在我有了另一个 Observable:
var timer = Observable.Interval(TimeSpan.FromSeconds(60));
_subscriptionTimer = timer.Subscribe(tick =>
{
OnTimerNextSubscribe();
});
我想在 OnTimerNextSubscribe 开始时停止订阅 _subscription 并在 OnTimerNextSubscribe 完成时更新它。
最好的做法是什么?
我应该处理 _subscription 并调用 StartSubscription()
基本上有两种选择:一种是处理然后重新启动,另一种是创建某种 on/off 可观察信号,然后相应地过滤 _subscription
:
void StartSubscription(Observable<bool> onOffSignal)
{
_subscription = = GetObservable()
.WithLatestFrom(onOffSignal, (s, b) => b ? Observable.Return(s) : Observable.Empty(s))
.Merge()
.Subscribe(
data => OnNextSubscribe(data),
ex => _logger.Error("Error"),
() => _logger.Warn("Complete")
);
}