C# Abortable Asynchronous Fifo Queue - 泄漏大量内存
C# Abortable Asynchronous Fifo Queue - leaking massive amounts of memory
我需要以 FIFO 方式处理来自生产者的数据,并且能够在同一生产者产生新的数据位时中止处理。
所以我基于 Stephen Cleary 的 AsyncCollection
(在我的示例中称为 AsyncCollectionAbortableFifoQueue
)和 TPL 的 BufferBlock
(在我的示例中称为 BufferBlockAbortableAsyncFifoQueue
)实现了一个可中止的 FIFO 队列).这是基于 AsyncCollection
的实现
public class AsyncCollectionAbortableFifoQueue<T> : IExecutableAsyncFifoQueue<T>
{
private AsyncCollection<AsyncWorkItem<T>> taskQueue = new AsyncCollection<AsyncWorkItem<T>>();
private readonly CancellationToken stopProcessingToken;
public AsyncCollectionAbortableFifoQueue(CancellationToken cancelToken)
{
stopProcessingToken = cancelToken;
_ = processQueuedItems();
}
public Task<T> EnqueueTask(Func<Task<T>> action, CancellationToken? cancelToken)
{
var tcs = new TaskCompletionSource<T>();
var item = new AsyncWorkItem<T>(tcs, action, cancelToken);
taskQueue.Add(item);
return tcs.Task;
}
protected virtual async Task processQueuedItems()
{
while (!stopProcessingToken.IsCancellationRequested)
{
try
{
var item = await taskQueue.TakeAsync(stopProcessingToken).ConfigureAwait(false);
if (item.CancelToken.HasValue && item.CancelToken.Value.IsCancellationRequested)
item.TaskSource.SetCanceled();
else
{
try
{
T result = await item.Action().ConfigureAwait(false);
item.TaskSource.SetResult(result); // Indicate completion
}
catch (Exception ex)
{
if (ex is OperationCanceledException && ((OperationCanceledException)ex).CancellationToken == item.CancelToken)
item.TaskSource.SetCanceled();
item.TaskSource.SetException(ex);
}
}
}
catch (Exception) { }
}
}
}
public interface IExecutableAsyncFifoQueue<T>
{
Task<T> EnqueueTask(Func<Task<T>> action, CancellationToken? cancelToken);
}
processQueuedItems
是将 AsyncWorkItem
从队列中取出并执行它们的任务,除非已请求取消。
要执行的异步操作被包装到一个 AsyncWorkItem
中,看起来像这样
internal class AsyncWorkItem<T>
{
public readonly TaskCompletionSource<T> TaskSource;
public readonly Func<Task<T>> Action;
public readonly CancellationToken? CancelToken;
public AsyncWorkItem(TaskCompletionSource<T> taskSource, Func<Task<T>> action, CancellationToken? cancelToken)
{
TaskSource = taskSource;
Action = action;
CancelToken = cancelToken;
}
}
然后有一个任务查找项目并将其出列以进行处理,或者处理它们,或者如果 CancellationToken
已被触发则中止。
一切正常 - 数据得到处理,如果收到新数据,旧数据的处理将中止。我现在的问题是,如果我提高使用率(生产者生产的产品比消费者生产的产品多得多),这些队列会泄漏大量内存。鉴于它是可中止的,未处理的数据应该被丢弃并最终从内存中消失。
那么让我们看看我是如何使用这些队列的。我有 1:1 生产者和消费者匹配。每个消费者处理单个生产者的数据。每当我得到一个新的数据项,并且它与前一个不匹配时,我就会捕获给定生产者的队列(User.UserId)或创建一个新的(代码片段中的 'executor') .然后我有一个 ConcurrentDictionary
每个 producer/consumer 组合包含一个 CancellationTokenSource
。如果前面有一个 CancellationTokenSource
,我会在它上面调用 Cancel
,并在 20 秒后调用 Dispose
(立即处理会导致队列中出现异常)。然后我对新数据进行排队处理。队列 return 给我一个我可以等待的任务,所以我知道数据处理何时完成,然后我 return 结果。
这是代码
internal class SimpleLeakyConsumer
{
private ConcurrentDictionary<string, IExecutableAsyncFifoQueue<bool>> groupStateChangeExecutors = new ConcurrentDictionary<string, IExecutableAsyncFifoQueue<bool>>();
private readonly ConcurrentDictionary<string, CancellationTokenSource> userStateChangeAborters = new ConcurrentDictionary<string, CancellationTokenSource>();
protected CancellationTokenSource serverShutDownSource;
private readonly int operationDuration = 1000;
internal SimpleLeakyConsumer(CancellationTokenSource serverShutDownSource, int operationDuration)
{
this.serverShutDownSource = serverShutDownSource;
this.operationDuration = operationDuration * 1000; // convert from seconds to milliseconds
}
internal async Task<bool> ProcessStateChange(string userId)
{
var executor = groupStateChangeExecutors.GetOrAdd(userId, new AsyncCollectionAbortableFifoQueue<bool>(serverShutDownSource.Token));
CancellationTokenSource oldSource = null;
using (var cancelSource = userStateChangeAborters.AddOrUpdate(userId, new CancellationTokenSource(), (key, existingValue) =>
{
oldSource = existingValue;
return new CancellationTokenSource();
}))
{
if (oldSource != null && !oldSource.IsCancellationRequested)
{
oldSource.Cancel();
_ = delayedDispose(oldSource);
}
try
{
var executionTask = executor.EnqueueTask(async () => { await Task.Delay(operationDuration, cancelSource.Token).ConfigureAwait(false); return true; }, cancelSource.Token);
var result = await executionTask.ConfigureAwait(false);
userStateChangeAborters.TryRemove(userId, out var aborter);
return result;
}
catch (Exception e)
{
if (e is TaskCanceledException || e is OperationCanceledException)
return true;
else
{
userStateChangeAborters.TryRemove(userId, out var aborter);
return false;
}
}
}
}
private async Task delayedDispose(CancellationTokenSource src)
{
try
{
await Task.Delay(20 * 1000).ConfigureAwait(false);
}
finally
{
try
{
src.Dispose();
}
catch (ObjectDisposedException) { }
}
}
}
在此示例实现中,所做的只是等待,然后 return 为真。
为了测试这个机制,我写了下面的数据生产者class:
internal class SimpleProducer
{
//variables defining the test
readonly int nbOfusers = 10;
readonly int minimumDelayBetweenTest = 1; // seconds
readonly int maximumDelayBetweenTests = 6; // seconds
readonly int operationDuration = 3; // number of seconds an operation takes in the tester
private readonly Random rand;
private List<User> users;
private readonly SimpleLeakyConsumer consumer;
protected CancellationTokenSource serverShutDownSource, testAbortSource;
private CancellationToken internalToken = CancellationToken.None;
internal SimpleProducer()
{
rand = new Random();
testAbortSource = new CancellationTokenSource();
serverShutDownSource = new CancellationTokenSource();
generateTestObjects(nbOfusers, 0, false);
consumer = new SimpleLeakyConsumer(serverShutDownSource, operationDuration);
}
internal void StartTests()
{
if (internalToken == CancellationToken.None || internalToken.IsCancellationRequested)
{
internalToken = testAbortSource.Token;
foreach (var user in users)
_ = setNewUserPresence(internalToken, user);
}
}
internal void StopTests()
{
testAbortSource.Cancel();
try
{
testAbortSource.Dispose();
}
catch (ObjectDisposedException) { }
testAbortSource = new CancellationTokenSource();
}
internal void Shutdown()
{
serverShutDownSource.Cancel();
}
private async Task setNewUserPresence(CancellationToken token, User user)
{
while (!token.IsCancellationRequested)
{
var nextInterval = rand.Next(minimumDelayBetweenTest, maximumDelayBetweenTests);
try
{
await Task.Delay(nextInterval * 1000, testAbortSource.Token).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
break;
}
//now randomly generate a new state and submit it to the tester class
UserState? status;
var nbStates = Enum.GetValues(typeof(UserState)).Length;
if (user.CurrentStatus == null)
{
var newInt = rand.Next(nbStates);
status = (UserState)newInt;
}
else
{
do
{
var newInt = rand.Next(nbStates);
status = (UserState)newInt;
}
while (status == user.CurrentStatus);
}
_ = sendUserStatus(user, status.Value);
}
}
private async Task sendUserStatus(User user, UserState status)
{
await consumer.ProcessStateChange(user.UserId).ConfigureAwait(false);
}
private void generateTestObjects(int nbUsers, int nbTeams, bool addAllUsersToTeams = false)
{
users = new List<User>();
for (int i = 0; i < nbUsers; i++)
{
var usr = new User
{
UserId = $"User_{i}",
Groups = new List<Team>()
};
users.Add(usr);
}
}
}
它使用class开头的变量来控制测试。您可以定义用户数量(nbOfusers
- 每个用户都是生产新数据的生产者),一个用户生产下一个数据之间的最小延迟(minimumDelayBetweenTest
)和最大延迟(maximumDelayBetweenTests
)数据以及消费者处理数据需要多长时间 (operationDuration
)。
StartTests
开始实际测试,StopTests
再次停止测试。
我这样称呼它们
static void Main(string[] args)
{
var tester = new SimpleProducer();
Console.WriteLine("Test successfully started, type exit to stop");
string str;
do
{
str = Console.ReadLine();
if (str == "start")
tester.StartTests();
else if (str == "stop")
tester.StopTests();
}
while (str != "exit");
tester.Shutdown();
}
因此,如果我 运行 我的测试仪并键入 'start',Producer
class 开始生成由 Consumer
消耗的状态。内存使用量开始增长、增长和增长。该示例配置到极端,我正在处理的真实场景不太密集,但生产者的一个动作可能会触发消费者端的多个动作,这些动作也必须以相同的异步中止 fifo 方式执行 -所以最坏的情况是,生成的一组数据会触发约 10 个消费者的操作(为简洁起见,我删除了最后一部分)。
当我有 100 个生产者时,每个生产者每 1-6 秒产生一个新数据项(随机地,数据产生也是随机的)。使用数据需要 3 秒.. 所以在很多情况下,在旧数据得到正确处理之前就有了一组新数据。
查看两个连续的内存转储,很明显内存使用的来源..都是与队列有关的碎片。鉴于我正在处理每个 TaskCancellationSource 并且不保留对生成的数据的任何引用(以及它们放入的 AsyncWorkItem
),我无法解释为什么这会不断消耗我的记忆并且我'我希望其他人可以告诉我我的错误。您也可以通过输入 'stop' 来中止测试。您会看到内存不再被占用,但即使您暂停并触发 GC,内存也不会被释放。
运行nable 形式的项目源代码在Github。启动之后,你必须在控制台中输入start
(加回车)来告诉生产者开始生产数据。您可以通过键入 stop
(加回车)
来停止生成数据
您的代码问题太多,无法通过调试找到漏洞。但这里有几件事已经是一个问题,应该首先解决:
看起来 getQueue
每次调用 processUseStateUpdateAsync
时都会为同一用户创建一个新队列,并且不会重用现有队列:
var executor = groupStateChangeExecutors.GetOrAdd(user.UserId, getQueue());
CancellationTokenSource
在每次调用下面的代码时都会泄漏,因为每次调用方法 AddOrUpdate
时都会创建新值,因此不应以这种方式传递到那里:
userStateChangeAborters.AddOrUpdate(user.UserId, new CancellationTokenSource(), (key, existingValue
此外,如果字典没有特定 user.UserId
:
的值,下面的代码也应该使用与您传递的相同的 cts 作为新的 cts
return new CancellationTokenSource();
还有一个潜在的 cancelSource
变量泄漏,因为它被绑定到一个可以活得比你想要的时间更长的委托,最好在那里传递具体的 CancellationToken
:
executor.EnqueueTask(() => processUserStateUpdateAsync(user, state, previousState,
cancelSource.Token));
出于某种原因,您没有在此处和其他地方放置 aborter
:
userStateChangeAborters.TryRemove(user.UserId, out var aborter);
创建 Channel
可能有潜在的泄漏:
taskQueue = Channel.CreateBounded<AsyncWorkItem<T>>(new BoundedChannelOptions(1)
您选择了选项 FullMode = BoundedChannelFullMode.DropOldest
,它应该删除最旧的值(如果有的话),所以我假设这会停止处理排队的项目,因为它们不会被读取。这是一个假设,但我假设如果一个旧项目在没有被处理的情况下被删除,那么 processUserStateUpdateAsync
将不会被调用并且所有资源都不会被释放。
您可以从这些发现的问题入手,之后应该更容易找到真正的原因。
我需要以 FIFO 方式处理来自生产者的数据,并且能够在同一生产者产生新的数据位时中止处理。
所以我基于 Stephen Cleary 的 AsyncCollection
(在我的示例中称为 AsyncCollectionAbortableFifoQueue
)和 TPL 的 BufferBlock
(在我的示例中称为 BufferBlockAbortableAsyncFifoQueue
)实现了一个可中止的 FIFO 队列).这是基于 AsyncCollection
public class AsyncCollectionAbortableFifoQueue<T> : IExecutableAsyncFifoQueue<T>
{
private AsyncCollection<AsyncWorkItem<T>> taskQueue = new AsyncCollection<AsyncWorkItem<T>>();
private readonly CancellationToken stopProcessingToken;
public AsyncCollectionAbortableFifoQueue(CancellationToken cancelToken)
{
stopProcessingToken = cancelToken;
_ = processQueuedItems();
}
public Task<T> EnqueueTask(Func<Task<T>> action, CancellationToken? cancelToken)
{
var tcs = new TaskCompletionSource<T>();
var item = new AsyncWorkItem<T>(tcs, action, cancelToken);
taskQueue.Add(item);
return tcs.Task;
}
protected virtual async Task processQueuedItems()
{
while (!stopProcessingToken.IsCancellationRequested)
{
try
{
var item = await taskQueue.TakeAsync(stopProcessingToken).ConfigureAwait(false);
if (item.CancelToken.HasValue && item.CancelToken.Value.IsCancellationRequested)
item.TaskSource.SetCanceled();
else
{
try
{
T result = await item.Action().ConfigureAwait(false);
item.TaskSource.SetResult(result); // Indicate completion
}
catch (Exception ex)
{
if (ex is OperationCanceledException && ((OperationCanceledException)ex).CancellationToken == item.CancelToken)
item.TaskSource.SetCanceled();
item.TaskSource.SetException(ex);
}
}
}
catch (Exception) { }
}
}
}
public interface IExecutableAsyncFifoQueue<T>
{
Task<T> EnqueueTask(Func<Task<T>> action, CancellationToken? cancelToken);
}
processQueuedItems
是将 AsyncWorkItem
从队列中取出并执行它们的任务,除非已请求取消。
要执行的异步操作被包装到一个 AsyncWorkItem
中,看起来像这样
internal class AsyncWorkItem<T>
{
public readonly TaskCompletionSource<T> TaskSource;
public readonly Func<Task<T>> Action;
public readonly CancellationToken? CancelToken;
public AsyncWorkItem(TaskCompletionSource<T> taskSource, Func<Task<T>> action, CancellationToken? cancelToken)
{
TaskSource = taskSource;
Action = action;
CancelToken = cancelToken;
}
}
然后有一个任务查找项目并将其出列以进行处理,或者处理它们,或者如果 CancellationToken
已被触发则中止。
一切正常 - 数据得到处理,如果收到新数据,旧数据的处理将中止。我现在的问题是,如果我提高使用率(生产者生产的产品比消费者生产的产品多得多),这些队列会泄漏大量内存。鉴于它是可中止的,未处理的数据应该被丢弃并最终从内存中消失。
那么让我们看看我是如何使用这些队列的。我有 1:1 生产者和消费者匹配。每个消费者处理单个生产者的数据。每当我得到一个新的数据项,并且它与前一个不匹配时,我就会捕获给定生产者的队列(User.UserId)或创建一个新的(代码片段中的 'executor') .然后我有一个 ConcurrentDictionary
每个 producer/consumer 组合包含一个 CancellationTokenSource
。如果前面有一个 CancellationTokenSource
,我会在它上面调用 Cancel
,并在 20 秒后调用 Dispose
(立即处理会导致队列中出现异常)。然后我对新数据进行排队处理。队列 return 给我一个我可以等待的任务,所以我知道数据处理何时完成,然后我 return 结果。
这是代码
internal class SimpleLeakyConsumer
{
private ConcurrentDictionary<string, IExecutableAsyncFifoQueue<bool>> groupStateChangeExecutors = new ConcurrentDictionary<string, IExecutableAsyncFifoQueue<bool>>();
private readonly ConcurrentDictionary<string, CancellationTokenSource> userStateChangeAborters = new ConcurrentDictionary<string, CancellationTokenSource>();
protected CancellationTokenSource serverShutDownSource;
private readonly int operationDuration = 1000;
internal SimpleLeakyConsumer(CancellationTokenSource serverShutDownSource, int operationDuration)
{
this.serverShutDownSource = serverShutDownSource;
this.operationDuration = operationDuration * 1000; // convert from seconds to milliseconds
}
internal async Task<bool> ProcessStateChange(string userId)
{
var executor = groupStateChangeExecutors.GetOrAdd(userId, new AsyncCollectionAbortableFifoQueue<bool>(serverShutDownSource.Token));
CancellationTokenSource oldSource = null;
using (var cancelSource = userStateChangeAborters.AddOrUpdate(userId, new CancellationTokenSource(), (key, existingValue) =>
{
oldSource = existingValue;
return new CancellationTokenSource();
}))
{
if (oldSource != null && !oldSource.IsCancellationRequested)
{
oldSource.Cancel();
_ = delayedDispose(oldSource);
}
try
{
var executionTask = executor.EnqueueTask(async () => { await Task.Delay(operationDuration, cancelSource.Token).ConfigureAwait(false); return true; }, cancelSource.Token);
var result = await executionTask.ConfigureAwait(false);
userStateChangeAborters.TryRemove(userId, out var aborter);
return result;
}
catch (Exception e)
{
if (e is TaskCanceledException || e is OperationCanceledException)
return true;
else
{
userStateChangeAborters.TryRemove(userId, out var aborter);
return false;
}
}
}
}
private async Task delayedDispose(CancellationTokenSource src)
{
try
{
await Task.Delay(20 * 1000).ConfigureAwait(false);
}
finally
{
try
{
src.Dispose();
}
catch (ObjectDisposedException) { }
}
}
}
在此示例实现中,所做的只是等待,然后 return 为真。
为了测试这个机制,我写了下面的数据生产者class:
internal class SimpleProducer
{
//variables defining the test
readonly int nbOfusers = 10;
readonly int minimumDelayBetweenTest = 1; // seconds
readonly int maximumDelayBetweenTests = 6; // seconds
readonly int operationDuration = 3; // number of seconds an operation takes in the tester
private readonly Random rand;
private List<User> users;
private readonly SimpleLeakyConsumer consumer;
protected CancellationTokenSource serverShutDownSource, testAbortSource;
private CancellationToken internalToken = CancellationToken.None;
internal SimpleProducer()
{
rand = new Random();
testAbortSource = new CancellationTokenSource();
serverShutDownSource = new CancellationTokenSource();
generateTestObjects(nbOfusers, 0, false);
consumer = new SimpleLeakyConsumer(serverShutDownSource, operationDuration);
}
internal void StartTests()
{
if (internalToken == CancellationToken.None || internalToken.IsCancellationRequested)
{
internalToken = testAbortSource.Token;
foreach (var user in users)
_ = setNewUserPresence(internalToken, user);
}
}
internal void StopTests()
{
testAbortSource.Cancel();
try
{
testAbortSource.Dispose();
}
catch (ObjectDisposedException) { }
testAbortSource = new CancellationTokenSource();
}
internal void Shutdown()
{
serverShutDownSource.Cancel();
}
private async Task setNewUserPresence(CancellationToken token, User user)
{
while (!token.IsCancellationRequested)
{
var nextInterval = rand.Next(minimumDelayBetweenTest, maximumDelayBetweenTests);
try
{
await Task.Delay(nextInterval * 1000, testAbortSource.Token).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
break;
}
//now randomly generate a new state and submit it to the tester class
UserState? status;
var nbStates = Enum.GetValues(typeof(UserState)).Length;
if (user.CurrentStatus == null)
{
var newInt = rand.Next(nbStates);
status = (UserState)newInt;
}
else
{
do
{
var newInt = rand.Next(nbStates);
status = (UserState)newInt;
}
while (status == user.CurrentStatus);
}
_ = sendUserStatus(user, status.Value);
}
}
private async Task sendUserStatus(User user, UserState status)
{
await consumer.ProcessStateChange(user.UserId).ConfigureAwait(false);
}
private void generateTestObjects(int nbUsers, int nbTeams, bool addAllUsersToTeams = false)
{
users = new List<User>();
for (int i = 0; i < nbUsers; i++)
{
var usr = new User
{
UserId = $"User_{i}",
Groups = new List<Team>()
};
users.Add(usr);
}
}
}
它使用class开头的变量来控制测试。您可以定义用户数量(nbOfusers
- 每个用户都是生产新数据的生产者),一个用户生产下一个数据之间的最小延迟(minimumDelayBetweenTest
)和最大延迟(maximumDelayBetweenTests
)数据以及消费者处理数据需要多长时间 (operationDuration
)。
StartTests
开始实际测试,StopTests
再次停止测试。
我这样称呼它们
static void Main(string[] args)
{
var tester = new SimpleProducer();
Console.WriteLine("Test successfully started, type exit to stop");
string str;
do
{
str = Console.ReadLine();
if (str == "start")
tester.StartTests();
else if (str == "stop")
tester.StopTests();
}
while (str != "exit");
tester.Shutdown();
}
因此,如果我 运行 我的测试仪并键入 'start',Producer
class 开始生成由 Consumer
消耗的状态。内存使用量开始增长、增长和增长。该示例配置到极端,我正在处理的真实场景不太密集,但生产者的一个动作可能会触发消费者端的多个动作,这些动作也必须以相同的异步中止 fifo 方式执行 -所以最坏的情况是,生成的一组数据会触发约 10 个消费者的操作(为简洁起见,我删除了最后一部分)。
当我有 100 个生产者时,每个生产者每 1-6 秒产生一个新数据项(随机地,数据产生也是随机的)。使用数据需要 3 秒.. 所以在很多情况下,在旧数据得到正确处理之前就有了一组新数据。
查看两个连续的内存转储,很明显内存使用的来源..都是与队列有关的碎片。鉴于我正在处理每个 TaskCancellationSource 并且不保留对生成的数据的任何引用(以及它们放入的 AsyncWorkItem
),我无法解释为什么这会不断消耗我的记忆并且我'我希望其他人可以告诉我我的错误。您也可以通过输入 'stop' 来中止测试。您会看到内存不再被占用,但即使您暂停并触发 GC,内存也不会被释放。
运行nable 形式的项目源代码在Github。启动之后,你必须在控制台中输入start
(加回车)来告诉生产者开始生产数据。您可以通过键入 stop
(加回车)
您的代码问题太多,无法通过调试找到漏洞。但这里有几件事已经是一个问题,应该首先解决:
看起来 getQueue
每次调用 processUseStateUpdateAsync
时都会为同一用户创建一个新队列,并且不会重用现有队列:
var executor = groupStateChangeExecutors.GetOrAdd(user.UserId, getQueue());
CancellationTokenSource
在每次调用下面的代码时都会泄漏,因为每次调用方法 AddOrUpdate
时都会创建新值,因此不应以这种方式传递到那里:
userStateChangeAborters.AddOrUpdate(user.UserId, new CancellationTokenSource(), (key, existingValue
此外,如果字典没有特定 user.UserId
:
return new CancellationTokenSource();
还有一个潜在的 cancelSource
变量泄漏,因为它被绑定到一个可以活得比你想要的时间更长的委托,最好在那里传递具体的 CancellationToken
:
executor.EnqueueTask(() => processUserStateUpdateAsync(user, state, previousState,
cancelSource.Token));
出于某种原因,您没有在此处和其他地方放置 aborter
:
userStateChangeAborters.TryRemove(user.UserId, out var aborter);
创建 Channel
可能有潜在的泄漏:
taskQueue = Channel.CreateBounded<AsyncWorkItem<T>>(new BoundedChannelOptions(1)
您选择了选项 FullMode = BoundedChannelFullMode.DropOldest
,它应该删除最旧的值(如果有的话),所以我假设这会停止处理排队的项目,因为它们不会被读取。这是一个假设,但我假设如果一个旧项目在没有被处理的情况下被删除,那么 processUserStateUpdateAsync
将不会被调用并且所有资源都不会被释放。
您可以从这些发现的问题入手,之后应该更容易找到真正的原因。