通过永远不会发生的“FirstAsync”观察者任务避免资源泄漏

Avoiding resource leaks with `FirstAsync` observer tasks that never happen

我有一些软件使用基于事件的网络协议进行控制,它使用 IObservable<Event> 来处理绑定消息。

在许多情况下,发送的消息需要特定的响应(或顺序,例如报告进度)。为了避免可能错过响应,正在使用 FirstAsyncToTask 提前设置任务,但是如果任务从未完成,这些似乎会泄漏。

也不允许简单地将 evtTask 放在 using 块中,因为不允许尝试处理未完成的任务。

var jobUuid = Guid.NewGuid();
var evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid).ToTask();
// e.g. if this throws without ever sending the message
await SendMessage($"job {jobUuid} download {url}");
var evt = await evtTask;
if (evt.Success)
{
    ...
}

该库是否为该用例提供了一种在离开范围时取消订阅的简单方法?

var jobUuid = Guid.NewGuid();
using(var evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid)
    .ToDisposableTask())) // Some method like this
{
    // e.g. if this throws without ever sending the message
    await SendMessage($"job {jobUuid} download {url}");
    var evt = await evtTask;
    if (evt.Success)
    {
        ...
    }
} // Get rid of the FirstAsync task if leave here before it completes for any reason

您可以使用 TPL Timeout(由@Fabjan 引用),或 Rx/System.Reactive 版本的 Timeout。

using 听起来不错,但没有意义。 using 相当于在 using 块末尾的某处调用 .Dispose 。我假设这里的问题是您的代码永远不会超过 await evtTask。将所有这些都放在假设的 using 中不会改变任何东西:您的代码仍在永远等待。

在更高的层次上,你的代码是命令式的而不是反应式的,你可能想将它重构为这样的东西:

var subscription = Events
    .Where(x => x.Action == Action.JobComplete)
    .Subscribe(x => 
    {
        if(x.Success)
        {
            //...
        }
        else
        {
            //...
        }
    });

处理 Task 无济于事,因为它没有任何用处(在大多数情况下,包括这种情况)。不过,取消任务会有帮助。取消处理由 ToTask 创建的基础订阅,因此解决此 "leak"。

所以它可以像这样:

Task<Event> evtTask;
using (var cts = new CancellationTokenSource()) {
    evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid)
             .ToTask(cts.Token);
    // e.g. if this throws without ever sending the message
    try {
        await SendMessage($"job {jobUuid} download {url}");
    }
    catch {
        cts.Cancel(); // disposes subscription
        throw;
    }
}
var evt = await evtTask;
if (evt.Success)
{
    ...
}

当然你可以用一些更方便的形式(比如扩展方法)来包装它。例如:

public static class ObservableExtensions {
    public static CancellableTaskWrapper<T> ToCancellableTask<T>(this IObservable<T> source) {
        return new CancellableTaskWrapper<T>(source);
    }

    public class CancellableTaskWrapper<T> : IDisposable
    {
        private readonly CancellationTokenSource _cts;
        public CancellableTaskWrapper(IObservable<T> source)
        {
            _cts = new CancellationTokenSource();
            Task = source.ToTask(_cts.Token);
        }

        public Task<T> Task { get; }

        public void Dispose()
        {
            _cts.Cancel();
            _cts.Dispose();
        }
    }
}

然后它变得接近你想要的:

var jobUuid = Guid.NewGuid();
using (var evtTask = Events.FirstAsync((x) => x.Action == Action.JobComplete && x.JobUuid == jobUuid).ToCancellableTask()) {
    await SendMessage($"job {jobUuid} download {url}");
    var evt = await evtTask.Task;
    if (evt.Success) {
        ...
    }
}