为什么我需要在完成后处理订阅?
Why do I need to dispose of subscriptions after completion?
RX 简介一书将 OnSubscribe 上的 return 值描述为 IDisposible
,并指出在调用 OnError
和 OnCompleted
时应处理订阅。
An interesting thing to consider is that when a sequence completes or
errors, you should still dispose of your subscription.
From Intro to RX: Lifetime Management, OnError and OnCompleted
这是为什么?
作为参考,这是我目前正在研究的class。我可能会在某个时候将其提交给代码审查。
using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
/// <summary>
/// Provides a timeout mechanism that will not timeout if it is signalled often enough
/// </summary>
internal class TrafficTimeout
{
private readonly Action onTimeout;
private object signalLock = new object();
private IObserver<Unit> signals;
/// <summary>
/// Initialises a new instance of the <see cref="TrafficTimeout"/> class.
/// </summary>
/// <param name="timeout">The duration to wait after receiving signals before timing out.</param>
/// <param name="onTimeout">The <see cref="Action"/> to perform when the the timeout duration expires.</param>
public TrafficTimeout(TimeSpan timeout, Action onTimeout)
{
// Subscribe to a throttled observable to trigger the expirey
var messageQueue = new BehaviorSubject<Unit>(Unit.Default);
IDisposable subscription = null;
subscription = messageQueue.Throttle(timeout).Subscribe(
p =>
{
messageQueue.OnCompleted();
messageQueue.Dispose();
});
this.signals = messageQueue.AsObserver();
this.onTimeout = onTimeout;
}
/// <summary>
/// Signals that traffic has been received.
/// </summary>
public void Signal()
{
lock (this.signalLock)
{
this.signals.OnNext(Unit.Default);
}
}
}
首先,这是一个您可以 运行 遇到的麻烦的例子:
void Main()
{
Console.WriteLine(GC.GetTotalMemory(true));
for (int i = 0; i < 1000; i++)
{
DumbSubscription();
Console.WriteLine(GC.GetTotalMemory(true));
}
Console.WriteLine(GC.GetTotalMemory(true));
}
public void DumbSubscription()
{
Observable.Interval(TimeSpan.FromMilliseconds(50))
.Subscribe(i => {});
}
您将看到您的内存使用量永远上升。活跃的 Rx 订阅不会被垃圾收集,并且这个可观察量是无限的。因此,如果您增加循环限制或添加延迟,您只会浪费更多内存:除了处理这些订阅外,没有任何帮助。
但是,假设我们将 DumbSubscription
的定义更改为:
public void DumbSubscription()
{
Observable.Interval(TimeSpan.FromMilliseconds(50))
.Take(1)
.Subscribe(i => {});
}
.Take(1)
加法意味着 observable 将在一个间隔后完成,因此它不再是无限的。您会看到您的内存使用量趋于稳定:订阅 倾向于 在完成或异常时正确处理它们自己。
然而,这并没有改变这样一个事实,即与任何其他 IDisposable
一样,最佳做法是调用 Dispose
(手动或通过 using
)以确保资源得到妥善处置。此外,如果你调整你的 observable,你可以很容易地 运行 解决开头指出的内存泄漏问题。
由 Subscribe
扩展方法编辑的一次性 return return 仅允许您在 之前手动取消订阅 observable observable 自然结束。
如果 observable 完成 - OnCompleted
或 OnError
- 那么订阅已经为您处理。
试试这个代码:
var xs = Observable.Create<int>(o =>
{
var d = Observable.Return(1).Subscribe(o);
return Disposable.Create(() =>
{
Console.WriteLine("Disposed!");
d.Dispose();
});
});
var subscription = xs.Subscribe(x => Console.WriteLine(x));
如果你运行上面的你会看到"Disposed!"在可观察完成时写入控制台,而你不需要在订阅上调用.Dispose()
。
需要注意的一件重要事情:垃圾收集器从不调用 .Dispose()
可观察订阅,因此您 必须 处理您的订阅,如果它们没有(或可能不会)已经)在您的订阅超出范围之前自然结束。
以此为例:
var wc = new WebClient();
var ds = Observable
.FromEventPattern<
DownloadStringCompletedEventHandler,
DownloadStringCompletedEventArgs>(
h => wc.DownloadStringCompleted += h,
h => wc.DownloadStringCompleted -= h);
var subscription =
ds.Subscribe(d =>
Console.WriteLine(d.EventArgs.Result));
ds
observable 只会在有订阅时附加到事件处理程序,并且只会在 observable 完成或订阅被处置时分离。由于它是一个事件处理程序,因此 observable 永远不会完成,因为它正在等待更多事件,因此处理是与事件分离的唯一方法(对于上面的示例)。
当你有一个你知道只有 return 的 FromEventPattern
可观察对象时,明智的做法是在订阅之前添加 .Take(1)
扩展方法,以允许事件处理程序自动分离,然后您不需要手动处理订阅。
像这样:
var ds = Observable
.FromEventPattern<
DownloadStringCompletedEventHandler,
DownloadStringCompletedEventArgs>(
h => wc.DownloadStringCompleted += h,
h => wc.DownloadStringCompleted -= h)
.Take(1);
希望对您有所帮助。
RX 简介一书将 OnSubscribe 上的 return 值描述为 IDisposible
,并指出在调用 OnError
和 OnCompleted
时应处理订阅。
An interesting thing to consider is that when a sequence completes or errors, you should still dispose of your subscription.
From Intro to RX: Lifetime Management, OnError and OnCompleted
这是为什么?
作为参考,这是我目前正在研究的class。我可能会在某个时候将其提交给代码审查。
using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
/// <summary>
/// Provides a timeout mechanism that will not timeout if it is signalled often enough
/// </summary>
internal class TrafficTimeout
{
private readonly Action onTimeout;
private object signalLock = new object();
private IObserver<Unit> signals;
/// <summary>
/// Initialises a new instance of the <see cref="TrafficTimeout"/> class.
/// </summary>
/// <param name="timeout">The duration to wait after receiving signals before timing out.</param>
/// <param name="onTimeout">The <see cref="Action"/> to perform when the the timeout duration expires.</param>
public TrafficTimeout(TimeSpan timeout, Action onTimeout)
{
// Subscribe to a throttled observable to trigger the expirey
var messageQueue = new BehaviorSubject<Unit>(Unit.Default);
IDisposable subscription = null;
subscription = messageQueue.Throttle(timeout).Subscribe(
p =>
{
messageQueue.OnCompleted();
messageQueue.Dispose();
});
this.signals = messageQueue.AsObserver();
this.onTimeout = onTimeout;
}
/// <summary>
/// Signals that traffic has been received.
/// </summary>
public void Signal()
{
lock (this.signalLock)
{
this.signals.OnNext(Unit.Default);
}
}
}
首先,这是一个您可以 运行 遇到的麻烦的例子:
void Main()
{
Console.WriteLine(GC.GetTotalMemory(true));
for (int i = 0; i < 1000; i++)
{
DumbSubscription();
Console.WriteLine(GC.GetTotalMemory(true));
}
Console.WriteLine(GC.GetTotalMemory(true));
}
public void DumbSubscription()
{
Observable.Interval(TimeSpan.FromMilliseconds(50))
.Subscribe(i => {});
}
您将看到您的内存使用量永远上升。活跃的 Rx 订阅不会被垃圾收集,并且这个可观察量是无限的。因此,如果您增加循环限制或添加延迟,您只会浪费更多内存:除了处理这些订阅外,没有任何帮助。
但是,假设我们将 DumbSubscription
的定义更改为:
public void DumbSubscription()
{
Observable.Interval(TimeSpan.FromMilliseconds(50))
.Take(1)
.Subscribe(i => {});
}
.Take(1)
加法意味着 observable 将在一个间隔后完成,因此它不再是无限的。您会看到您的内存使用量趋于稳定:订阅 倾向于 在完成或异常时正确处理它们自己。
然而,这并没有改变这样一个事实,即与任何其他 IDisposable
一样,最佳做法是调用 Dispose
(手动或通过 using
)以确保资源得到妥善处置。此外,如果你调整你的 observable,你可以很容易地 运行 解决开头指出的内存泄漏问题。
由 Subscribe
扩展方法编辑的一次性 return return 仅允许您在 之前手动取消订阅 observable observable 自然结束。
如果 observable 完成 - OnCompleted
或 OnError
- 那么订阅已经为您处理。
试试这个代码:
var xs = Observable.Create<int>(o =>
{
var d = Observable.Return(1).Subscribe(o);
return Disposable.Create(() =>
{
Console.WriteLine("Disposed!");
d.Dispose();
});
});
var subscription = xs.Subscribe(x => Console.WriteLine(x));
如果你运行上面的你会看到"Disposed!"在可观察完成时写入控制台,而你不需要在订阅上调用.Dispose()
。
需要注意的一件重要事情:垃圾收集器从不调用 .Dispose()
可观察订阅,因此您 必须 处理您的订阅,如果它们没有(或可能不会)已经)在您的订阅超出范围之前自然结束。
以此为例:
var wc = new WebClient();
var ds = Observable
.FromEventPattern<
DownloadStringCompletedEventHandler,
DownloadStringCompletedEventArgs>(
h => wc.DownloadStringCompleted += h,
h => wc.DownloadStringCompleted -= h);
var subscription =
ds.Subscribe(d =>
Console.WriteLine(d.EventArgs.Result));
ds
observable 只会在有订阅时附加到事件处理程序,并且只会在 observable 完成或订阅被处置时分离。由于它是一个事件处理程序,因此 observable 永远不会完成,因为它正在等待更多事件,因此处理是与事件分离的唯一方法(对于上面的示例)。
当你有一个你知道只有 return 的 FromEventPattern
可观察对象时,明智的做法是在订阅之前添加 .Take(1)
扩展方法,以允许事件处理程序自动分离,然后您不需要手动处理订阅。
像这样:
var ds = Observable
.FromEventPattern<
DownloadStringCompletedEventHandler,
DownloadStringCompletedEventArgs>(
h => wc.DownloadStringCompleted += h,
h => wc.DownloadStringCompleted -= h)
.Take(1);
希望对您有所帮助。