C# Abortable Asynchronous Fifo Queue - 泄漏大量内存

C# Abortable Asynchronous Fifo Queue - leaking massive amounts of memory

我需要以 FIFO 方式处理来自生产者的数据,并且能够在同一生产者产生新的数据位时中止处理。

所以我基于 Stephen Cleary 的 AsyncCollection(在我的示例中称为 AsyncCollectionAbortableFifoQueue)和 TPL 的 BufferBlock(在我的示例中称为 BufferBlockAbortableAsyncFifoQueue)实现了一个可中止的 FIFO 队列).这是基于 AsyncCollection

的实现
public class AsyncCollectionAbortableFifoQueue<T> : IExecutableAsyncFifoQueue<T>
{
    private AsyncCollection<AsyncWorkItem<T>> taskQueue = new AsyncCollection<AsyncWorkItem<T>>();
    private readonly CancellationToken stopProcessingToken;

    public AsyncCollectionAbortableFifoQueue(CancellationToken cancelToken)
    {
        stopProcessingToken = cancelToken;
        _ = processQueuedItems();
    }

    public Task<T> EnqueueTask(Func<Task<T>> action, CancellationToken? cancelToken)
    {
        var tcs = new TaskCompletionSource<T>();
        var item = new AsyncWorkItem<T>(tcs, action, cancelToken);
        taskQueue.Add(item);
        return tcs.Task;
    }

    protected virtual async Task processQueuedItems()
    {
        while (!stopProcessingToken.IsCancellationRequested)
        {
            try
            {
                var item = await taskQueue.TakeAsync(stopProcessingToken).ConfigureAwait(false);
                if (item.CancelToken.HasValue && item.CancelToken.Value.IsCancellationRequested)
                    item.TaskSource.SetCanceled();
                else
                {
                    try
                    {
                        T result = await item.Action().ConfigureAwait(false);
                        item.TaskSource.SetResult(result);   // Indicate completion
                    }
                    catch (Exception ex)
                    {
                        if (ex is OperationCanceledException && ((OperationCanceledException)ex).CancellationToken == item.CancelToken)
                            item.TaskSource.SetCanceled();
                        item.TaskSource.SetException(ex);
                    }
                }
            }
            catch (Exception) { }
        }
    }
}

public interface IExecutableAsyncFifoQueue<T>
{
    Task<T> EnqueueTask(Func<Task<T>> action, CancellationToken? cancelToken);
}

processQueuedItems 是将 AsyncWorkItem 从队列中取出并执行它们的任务,除非已请求取消。

要执行的异步操作被包装到一个 AsyncWorkItem 中,看起来像这样

internal class AsyncWorkItem<T>
{
    public readonly TaskCompletionSource<T> TaskSource;
    public readonly Func<Task<T>> Action;
    public readonly CancellationToken? CancelToken;

    public AsyncWorkItem(TaskCompletionSource<T> taskSource, Func<Task<T>> action, CancellationToken? cancelToken)
    {
        TaskSource = taskSource;
        Action = action;
        CancelToken = cancelToken;
    }
}

然后有一个任务查找项目并将其出列以进行处理,或者处理它们,或者如果 CancellationToken 已被触发则中止。

一切正常 - 数据得到处理,如果收到新数据,旧数据的处理将中止。我现在的问题是,如果我提高使用率(生产者生产的产品比消费者生产的产品多得多),这些队列会泄漏大量内存。鉴于它是可中止的,未处理的数据应该被丢弃并最终从内存中消失。

那么让我们看看我是如何使用这些队列的。我有 1:1 生产者和消费者匹配。每个消费者处理单个生产者的数据。每当我得到一个新的数据项,并且它与前一个不匹配时,我就会捕获给定生产者的队列(User.UserId)或创建一个新的(代码片段中的 'executor') .然后我有一个 ConcurrentDictionary 每个 producer/consumer 组合包含一个 CancellationTokenSource。如果前面有一个 CancellationTokenSource,我会在它上面调用 Cancel,并在 20 秒后调用 Dispose(立即处理会导致队列中出现异常)。然后我对新数据进行排队处理。队列 return 给我一个我可以等待的任务,所以我知道数据处理何时完成,然后我 return 结果。

这是代码

internal class SimpleLeakyConsumer
{
    private ConcurrentDictionary<string, IExecutableAsyncFifoQueue<bool>> groupStateChangeExecutors = new ConcurrentDictionary<string, IExecutableAsyncFifoQueue<bool>>();
    private readonly ConcurrentDictionary<string, CancellationTokenSource> userStateChangeAborters = new ConcurrentDictionary<string, CancellationTokenSource>();
    protected CancellationTokenSource serverShutDownSource;
    private readonly int operationDuration = 1000;

    internal SimpleLeakyConsumer(CancellationTokenSource serverShutDownSource, int operationDuration)
    {
        this.serverShutDownSource = serverShutDownSource;
        this.operationDuration = operationDuration * 1000; // convert from seconds to milliseconds
    }

    internal async Task<bool> ProcessStateChange(string userId)
    {
        var executor = groupStateChangeExecutors.GetOrAdd(userId, new AsyncCollectionAbortableFifoQueue<bool>(serverShutDownSource.Token));
        CancellationTokenSource oldSource = null;
        using (var cancelSource = userStateChangeAborters.AddOrUpdate(userId, new CancellationTokenSource(), (key, existingValue) =>
        {
            oldSource = existingValue;
            return new CancellationTokenSource();
        }))
        {
            if (oldSource != null && !oldSource.IsCancellationRequested)
            {
                oldSource.Cancel();
                _ = delayedDispose(oldSource);
            }
            try
            {
                var executionTask = executor.EnqueueTask(async () => { await Task.Delay(operationDuration, cancelSource.Token).ConfigureAwait(false); return true; }, cancelSource.Token);
                var result = await executionTask.ConfigureAwait(false);
                userStateChangeAborters.TryRemove(userId, out var aborter);
                return result;
            }
            catch (Exception e)
            {
                if (e is TaskCanceledException || e is OperationCanceledException)
                    return true;
                else
                {
                    userStateChangeAborters.TryRemove(userId, out var aborter);
                    return false;
                }
            }
        }
    }

    private async Task delayedDispose(CancellationTokenSource src)
    {
        try
        {
            await Task.Delay(20 * 1000).ConfigureAwait(false);
        }
        finally
        {
            try
            {
                src.Dispose();
            }
            catch (ObjectDisposedException) { }
        }
    }
}

在此示例实现中,所做的只是等待,然后 return 为真。

为了测试这个机制,我写了下面的数据生产者class:

internal class SimpleProducer
{

    //variables defining the test
    readonly int nbOfusers = 10;
    readonly int minimumDelayBetweenTest = 1; // seconds
    readonly int maximumDelayBetweenTests = 6; // seconds
    readonly int operationDuration = 3; // number of seconds an operation takes in the tester

    private readonly Random rand;
    private List<User> users;
    private readonly SimpleLeakyConsumer consumer;

    protected CancellationTokenSource serverShutDownSource, testAbortSource;
    private CancellationToken internalToken = CancellationToken.None;

    internal SimpleProducer()
    {
        rand = new Random();
        testAbortSource = new CancellationTokenSource();
        serverShutDownSource = new CancellationTokenSource();
        generateTestObjects(nbOfusers, 0, false);
        consumer = new SimpleLeakyConsumer(serverShutDownSource, operationDuration);
    }

    internal void StartTests()
    {
        if (internalToken == CancellationToken.None || internalToken.IsCancellationRequested)
        {
            internalToken = testAbortSource.Token;
            foreach (var user in users)
                _ = setNewUserPresence(internalToken, user);
        }
    }

    internal void StopTests()
    {
        testAbortSource.Cancel();
        try
        {
            testAbortSource.Dispose();
        }
        catch (ObjectDisposedException) { }
        testAbortSource = new CancellationTokenSource();
    }

    internal void Shutdown()
    {
        serverShutDownSource.Cancel();
    }

    private async Task setNewUserPresence(CancellationToken token, User user)
    {
        while (!token.IsCancellationRequested)
        {
            var nextInterval = rand.Next(minimumDelayBetweenTest, maximumDelayBetweenTests);
            try
            {
                await Task.Delay(nextInterval * 1000, testAbortSource.Token).ConfigureAwait(false);
            }
            catch (TaskCanceledException)
            {
                break;
            }
            //now randomly generate a new state and submit it to the tester class
            UserState? status;
            var nbStates = Enum.GetValues(typeof(UserState)).Length;
            if (user.CurrentStatus == null)
            {
                var newInt = rand.Next(nbStates);
                status = (UserState)newInt;
            }
            else
            {
                do
                {
                    var newInt = rand.Next(nbStates);
                    status = (UserState)newInt;
                }
                while (status == user.CurrentStatus);
            }
            _ = sendUserStatus(user, status.Value);
        }
    }

    private async Task sendUserStatus(User user, UserState status)
    {
        await consumer.ProcessStateChange(user.UserId).ConfigureAwait(false);
    }

    private void generateTestObjects(int nbUsers, int nbTeams, bool addAllUsersToTeams = false)
    {
        users = new List<User>();
        for (int i = 0; i < nbUsers; i++)
        {
            var usr = new User
            {
                UserId = $"User_{i}",
                Groups = new List<Team>()
            };
            users.Add(usr);
        }
    }
}

它使用class开头的变量来控制测试。您可以定义用户数量(nbOfusers - 每个用户都是生产新数据的生产者),一个用户生产下一个数据之间的最小延迟(minimumDelayBetweenTest)和最大延迟(maximumDelayBetweenTests)数据以及消费者处理数据需要多长时间 (operationDuration)。

StartTests 开始实际测试,StopTests 再次停止测试。

我这样称呼它们

static void Main(string[] args)
    {
        var tester = new SimpleProducer();
        Console.WriteLine("Test successfully started, type exit to stop");
        string str;
        do
        {
            str = Console.ReadLine();
            if (str == "start")
                tester.StartTests();
            else if (str == "stop")
                tester.StopTests();
        }
        while (str != "exit");
        tester.Shutdown();
    }

因此,如果我 运行 我的测试仪并键入 'start',Producer class 开始生成由 Consumer 消耗的状态。内存使用量开始增长、增长和增长。该示例配置到极端,我正在处理的真实场景不太密集,但生产者的一个动作可能会触发消费者端的多个动作,这些动作也必须以相同的异步中止 fifo 方式执行 -所以最坏的情况是,生成的一组数据会触发约 10 个消费者的操作(为简洁起见,我删除了最后一部分)。

当我有 100 个生产者时,每个生产者每 1-6 秒产生一个新数据项(随机地,数据产生也是随机的)。使用数据需要 3 秒.. 所以在很多情况下,在旧数据得到正确处理之前就有了一组新数据。

查看两个连续的内存转储,很明显内存使用的来源..都是与队列有关的碎片。鉴于我正在处理每个 TaskCancellationSource 并且不保留对生成的数据的任何引用(以及它们放入的 AsyncWorkItem),我无法解释为什么这会不断消耗我的记忆并且我'我希望其他人可以告诉我我的错误。您也可以通过输入 'stop' 来中止测试。您会看到内存不再被占用,但即使您暂停并触发 GC,内存也不会被释放。

运行nable 形式的项目源代码在Github。启动之后,你必须在控制台中输入start(加回车)来告诉生产者开始生产数据。您可以通过键入 stop(加回车)

来停止生成数据

您的代码问题太多,无法通过调试找到漏洞。但这里有几件事已经是一个问题,应该首先解决:

看起来 getQueue 每次调用 processUseStateUpdateAsync 时都会为同一用户创建一个新队列,并且不会重用现有队列:

var executor = groupStateChangeExecutors.GetOrAdd(user.UserId, getQueue());

CancellationTokenSource 在每次调用下面的代码时都会泄漏,因为每次调用方法 AddOrUpdate 时都会创建新值,因此不应以这种方式传递到那里:

userStateChangeAborters.AddOrUpdate(user.UserId, new CancellationTokenSource(), (key, existingValue

此外,如果字典没有特定 user.UserId:

的值,下面的代码也应该使用与您传递的相同的 cts 作为新的 cts
return new CancellationTokenSource();

还有一个潜在的 cancelSource 变量泄漏,因为它被绑定到一个可以活得比你想要的时间更长的委托,最好在那里传递具体的 CancellationToken:

executor.EnqueueTask(() => processUserStateUpdateAsync(user, state, previousState,
                    cancelSource.Token));

出于某种原因,您没有在此处和其他地方放置 aborter

userStateChangeAborters.TryRemove(user.UserId, out var aborter);

创建 Channel 可能有潜在的泄漏:

taskQueue = Channel.CreateBounded<AsyncWorkItem<T>>(new BoundedChannelOptions(1)

您选择了选项 FullMode = BoundedChannelFullMode.DropOldest,它应该删除最旧的值(如果有的话),所以我假设这会停止处理排队的项目,因为它们不会被读取。这是一个假设,但我假设如果一个旧项目在没有被处理的情况下被删除,那么 processUserStateUpdateAsync 将不会被调用并且所有资源都不会被释放。

您可以从这些发现的问题入手,之后应该更容易找到真正的原因。