通过永远不会发生的“FirstAsync”观察者任务避免资源泄漏
Avoiding resource leaks with `FirstAsync` observer tasks that never happen
我有一些软件使用基于事件的网络协议进行控制,它使用 IObservable<Event>
来处理绑定消息。
在许多情况下,发送的消息需要特定的响应(或顺序,例如报告进度)。为了避免可能错过响应,正在使用 FirstAsync
和 ToTask
提前设置任务,但是如果任务从未完成,这些似乎会泄漏。
也不允许简单地将 evtTask
放在 using
块中,因为不允许尝试处理未完成的任务。
var jobUuid = Guid.NewGuid();
var evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid).ToTask();
// e.g. if this throws without ever sending the message
await SendMessage($"job {jobUuid} download {url}");
var evt = await evtTask;
if (evt.Success)
{
...
}
该库是否为该用例提供了一种在离开范围时取消订阅的简单方法?
var jobUuid = Guid.NewGuid();
using(var evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid)
.ToDisposableTask())) // Some method like this
{
// e.g. if this throws without ever sending the message
await SendMessage($"job {jobUuid} download {url}");
var evt = await evtTask;
if (evt.Success)
{
...
}
} // Get rid of the FirstAsync task if leave here before it completes for any reason
您可以使用 TPL Timeout(由@Fabjan 引用),或 Rx/System.Reactive 版本的 Timeout。
using
听起来不错,但没有意义。 using 相当于在 using 块末尾的某处调用 .Dispose
。我假设这里的问题是您的代码永远不会超过 await evtTask
。将所有这些都放在假设的 using
中不会改变任何东西:您的代码仍在永远等待。
在更高的层次上,你的代码是命令式的而不是反应式的,你可能想将它重构为这样的东西:
var subscription = Events
.Where(x => x.Action == Action.JobComplete)
.Subscribe(x =>
{
if(x.Success)
{
//...
}
else
{
//...
}
});
处理 Task
无济于事,因为它没有任何用处(在大多数情况下,包括这种情况)。不过,取消任务会有帮助。取消处理由 ToTask
创建的基础订阅,因此解决此 "leak"。
所以它可以像这样:
Task<Event> evtTask;
using (var cts = new CancellationTokenSource()) {
evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid)
.ToTask(cts.Token);
// e.g. if this throws without ever sending the message
try {
await SendMessage($"job {jobUuid} download {url}");
}
catch {
cts.Cancel(); // disposes subscription
throw;
}
}
var evt = await evtTask;
if (evt.Success)
{
...
}
当然你可以用一些更方便的形式(比如扩展方法)来包装它。例如:
public static class ObservableExtensions {
public static CancellableTaskWrapper<T> ToCancellableTask<T>(this IObservable<T> source) {
return new CancellableTaskWrapper<T>(source);
}
public class CancellableTaskWrapper<T> : IDisposable
{
private readonly CancellationTokenSource _cts;
public CancellableTaskWrapper(IObservable<T> source)
{
_cts = new CancellationTokenSource();
Task = source.ToTask(_cts.Token);
}
public Task<T> Task { get; }
public void Dispose()
{
_cts.Cancel();
_cts.Dispose();
}
}
}
然后它变得接近你想要的:
var jobUuid = Guid.NewGuid();
using (var evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid).ToCancellableTask()) {
await SendMessage($"job {jobUuid} download {url}");
var evt = await evtTask.Task;
if (evt.Success) {
...
}
}
我有一些软件使用基于事件的网络协议进行控制,它使用 IObservable<Event>
来处理绑定消息。
在许多情况下,发送的消息需要特定的响应(或顺序,例如报告进度)。为了避免可能错过响应,正在使用 FirstAsync
和 ToTask
提前设置任务,但是如果任务从未完成,这些似乎会泄漏。
也不允许简单地将 evtTask
放在 using
块中,因为不允许尝试处理未完成的任务。
var jobUuid = Guid.NewGuid();
var evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid).ToTask();
// e.g. if this throws without ever sending the message
await SendMessage($"job {jobUuid} download {url}");
var evt = await evtTask;
if (evt.Success)
{
...
}
该库是否为该用例提供了一种在离开范围时取消订阅的简单方法?
var jobUuid = Guid.NewGuid();
using(var evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid)
.ToDisposableTask())) // Some method like this
{
// e.g. if this throws without ever sending the message
await SendMessage($"job {jobUuid} download {url}");
var evt = await evtTask;
if (evt.Success)
{
...
}
} // Get rid of the FirstAsync task if leave here before it completes for any reason
您可以使用 TPL Timeout(由@Fabjan 引用),或 Rx/System.Reactive 版本的 Timeout。
using
听起来不错,但没有意义。 using 相当于在 using 块末尾的某处调用 .Dispose
。我假设这里的问题是您的代码永远不会超过 await evtTask
。将所有这些都放在假设的 using
中不会改变任何东西:您的代码仍在永远等待。
在更高的层次上,你的代码是命令式的而不是反应式的,你可能想将它重构为这样的东西:
var subscription = Events
.Where(x => x.Action == Action.JobComplete)
.Subscribe(x =>
{
if(x.Success)
{
//...
}
else
{
//...
}
});
处理 Task
无济于事,因为它没有任何用处(在大多数情况下,包括这种情况)。不过,取消任务会有帮助。取消处理由 ToTask
创建的基础订阅,因此解决此 "leak"。
所以它可以像这样:
Task<Event> evtTask;
using (var cts = new CancellationTokenSource()) {
evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid)
.ToTask(cts.Token);
// e.g. if this throws without ever sending the message
try {
await SendMessage($"job {jobUuid} download {url}");
}
catch {
cts.Cancel(); // disposes subscription
throw;
}
}
var evt = await evtTask;
if (evt.Success)
{
...
}
当然你可以用一些更方便的形式(比如扩展方法)来包装它。例如:
public static class ObservableExtensions {
public static CancellableTaskWrapper<T> ToCancellableTask<T>(this IObservable<T> source) {
return new CancellableTaskWrapper<T>(source);
}
public class CancellableTaskWrapper<T> : IDisposable
{
private readonly CancellationTokenSource _cts;
public CancellableTaskWrapper(IObservable<T> source)
{
_cts = new CancellationTokenSource();
Task = source.ToTask(_cts.Token);
}
public Task<T> Task { get; }
public void Dispose()
{
_cts.Cancel();
_cts.Dispose();
}
}
}
然后它变得接近你想要的:
var jobUuid = Guid.NewGuid();
using (var evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid).ToCancellableTask()) {
await SendMessage($"job {jobUuid} download {url}");
var evt = await evtTask.Task;
if (evt.Success) {
...
}
}