从 producer/consumer 队列中删除已取消的任务
Remove cancelled Task from producer/consumer queue
我想使用异步 producer/consumer 队列(AsyncEx 库)通过总线一次发送一条消息。现在我只是通过异步阻塞来实现这一点。它工作正常,但我无法控制队列:(
所以我想到了以下解决方案,问题是取消的任务不会从队列中删除。如果我将队列限制为 10(因为每条消息需要 1 秒发送,最大队列时间应为 10 秒左右)并且队列中已经包含 8 个等待任务和 2 个已取消任务,那么下一个排队任务将抛出 InvalidOperationException,尽管无论如何都不会发送两个已取消的任务。
也许有更好的方法来做到这一点 :D
class Program
{
static AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>> s_Queue =
new AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>>();
static void Main()
{
StartAsync().Wait();
}
static async Task StartAsync()
{
var sendingTask = StartSendingAsync();
var tasks = new List<Task>();
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8)))
{
for (var i = 0; i < 10; i++)
{
tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token));
}
try
{
await Task.WhenAll(tasks);
Console.WriteLine("All messages sent.");
}
catch (TaskCanceledException)
{
Console.WriteLine("At least one task was canceled.");
}
}
s_Queue.CompleteAdding();
await sendingTask;
s_Queue.Dispose();
Console.WriteLine("Queue completed.");
Console.WriteLine("Press any key to continue...");
Console.ReadKey();
}
static async Task EnqueueMessageAsync(string message, CancellationToken token)
{
var tcs = new TaskCompletionSource();
using (token.Register(() => tcs.TrySetCanceled()))
{
await s_Queue.EnqueueAsync(new Tuple<string, TaskCompletionSource>(message, tcs));
Console.WriteLine("Thread '{0}' - {1}: {2} queued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message);
await tcs.Task;
}
}
static async Task SendMessageAsync(string message)
{
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine("Thread '{0}' - {1}: {2} sent.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message);
}
static async Task StartSendingAsync()
{
while (await s_Queue.OutputAvailableAsync())
{
var t = await s_Queue.DequeueAsync();
if (t.Item2.Task.IsCanceled || t.Item2.Task.IsFaulted) continue;
await SendMessageAsync(t.Item1);
t.Item2.TrySetResult();
}
}
}
编辑 1:
正如 svik 指出的那样,仅当队列已经完成时才会抛出 InvalidOperationException。所以这个解决方案甚至没有解决我最初的非托管 "queue" 等待任务问题。如果有例如超过 10 个 calls/10s 我得到了一个完整的队列和一个额外的非托管 "queue" 等待任务,就像我的异步阻塞方法 (AsyncMonitor)。我想我必须想出一些其他的解决方案......
编辑 2:
我有 N 个不同的消息生产者(我不知道有多少,因为这不是我的代码),只有一个消费者通过总线发送消息并检查它们是否正确发送(不是真正的字符串消息)。
以下代码模拟了代码应该中断的情况(队列大小为 10):
- 排队 10 条消息(超时 5 秒)
- 等待 5 秒(消息 0-4 已发送,消息 5-9 已取消)
- 排队 11 条新消息(w/o 超时)
- 消息 10 - 19 应该排队,因为队列只包含已取消的消息
- 消息 20 应该抛出异常(例如 QueueOverflowException),因为队列已满,这将由生产者代码处理或不处理
制作人:
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
{
for (var i = 0; i < 10; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token)); }
await Task.Delay(TimeSpan.FromSeconds(5));
for (var i = 10; i < 21; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, default(CancellationToken))); }
try
{
await Task.WhenAll(tasks);
Console.WriteLine("All messages sent.");
}
catch (TaskCanceledException)
{
Console.WriteLine("At least one task was canceled.");
Console.WriteLine("Press any key to complete queue...");
Console.ReadKey();
}
}
目标是,我想完全控制所有应该发送的消息,但是在我之前发布的代码中并不是这样,因为我只能控制队列中的消息但是不是等待入队的消息(可能有 10000 条消息异步等待入队,我不知道 => 生产者代码无论如何都不会按预期工作,因为发送所有等待的消息需要很长时间...)
我希望这能让我更清楚我想要实现的目标 ;)
我不确定回答我自己的问题是否可以,所以我不会将其标记为答案,也许有人想出更好的解决方案:P
首先是生产者代码:
static async Task StartAsync()
{
using (var queue = new SendMessageQueue(10, new SendMessageService()))
using (var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(4.5)))
{
var tasks = new List<Task>();
for (var i = 0; i < 10; i++)
{
tasks.Add(queue.SendAsync(i.ToString(), timeoutTokenSource.Token));
}
await Task.Delay(TimeSpan.FromSeconds(4.5));
for (var i = 10; i < 25; i++)
{
tasks.Add(queue.SendAsync(i.ToString(), default(CancellationToken)));
}
await queue.CompleteSendingAsync();
for (var i = 0; i < tasks.Count; i++ )
{
try
{
await tasks[i];
Console.WriteLine("Message '{0}' send.", i);
}
catch (TaskCanceledException)
{
Console.WriteLine("Message '{0}' canceled.", i);
}
catch (QueueOverflowException ex)
{
Console.WriteLine(ex.Message);
}
}
}
Console.WriteLine("Press any key to continue...");
Console.ReadKey();
}
- 25 条消息在 5 秒内排队
- 发送了 16 条消息
- 3 条消息未发送(队列已满)
- 6 条消息被取消
这里是基于列表的 "Queue" class。它是队列和消费者的组合。同步是使用 AsyncMonitor class(Stephen Cleary 的 AsyncEx)完成的。
class SendMessageQueue : IDisposable
{
private bool m_Disposed;
private bool m_CompleteSending;
private Task m_SendingTask;
private AsyncMonitor m_Monitor;
private List<MessageTaskCompletionSource> m_MessageCollection;
private ISendMessageService m_SendMessageService;
public int Capacity { get; private set; }
public SendMessageQueue(int capacity, ISendMessageService service)
{
Capacity = capacity;
m_Monitor = new AsyncMonitor();
m_MessageCollection = new List<MessageTaskCompletionSource>();
m_SendMessageService = service;
m_SendingTask = StartSendingAsync();
}
public async Task<bool> SendAsync(string message, CancellationToken token)
{
if (m_Disposed) { throw new ObjectDisposedException(GetType().Name); }
if (message == null) { throw new ArgumentNullException("message"); }
using (var messageTcs = new MessageTaskCompletionSource(message, token))
{
await AddAsync(messageTcs);
return await messageTcs.Task;
}
}
public async Task CompleteSendingAsync()
{
if (m_Disposed) { throw new ObjectDisposedException(GetType().Name); }
using (m_Monitor.Enter())
{
m_CompleteSending = true;
}
await m_SendingTask;
}
private async Task AddAsync(MessageTaskCompletionSource message)
{
using (await m_Monitor.EnterAsync(message.Token))
{
if (m_CompleteSending) { throw new InvalidOperationException("Queue already completed."); }
if (Capacity < m_MessageCollection.Count)
{
m_MessageCollection.RemoveAll(item => item.IsCanceled);
if (Capacity < m_MessageCollection.Count)
{
throw new QueueOverflowException(string.Format("Queue overflow; '{0}' couldn't be enqueued.", message.Message));
}
}
m_MessageCollection.Add(message);
}
m_Monitor.Pulse(); // signal new message
Console.WriteLine("Thread '{0}' - {1}: '{2}' enqueued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message.Message);
}
private async Task<MessageTaskCompletionSource> TakeAsync()
{
using (await m_Monitor.EnterAsync())
{
var message = m_MessageCollection.ElementAt(0);
m_MessageCollection.RemoveAt(0);
return message;
}
}
private async Task<bool> OutputAvailableAsync()
{
using (await m_Monitor.EnterAsync())
{
if (m_MessageCollection.Count > 0) { return true; }
else if (m_CompleteSending) { return false; }
await m_Monitor.WaitAsync();
return true;
}
}
private async Task StartSendingAsync()
{
while (await OutputAvailableAsync())
{
var message = await TakeAsync();
if (message.IsCanceled) continue;
try
{
var result = await m_SendMessageService.SendMessageAsync(message.Message, message.Token);
message.TrySetResult(result);
}
catch (TaskCanceledException) { message.TrySetCanceled(); }
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected void Dispose(bool disposing)
{
if (m_Disposed) return;
if (disposing)
{
if (m_MessageCollection != null)
{
var tmp = m_MessageCollection;
m_MessageCollection = null;
tmp.ForEach(item => item.Dispose());
tmp.Clear();
}
}
m_Disposed = true;
}
#region MessageTaskCompletionSource Class
class MessageTaskCompletionSource : TaskCompletionSource<bool>, IDisposable
{
private bool m_Disposed;
private IDisposable m_CancellationTokenRegistration;
public string Message { get; private set; }
public CancellationToken Token { get; private set; }
public bool IsCompleted { get { return Task.IsCompleted; } }
public bool IsCanceled { get { return Task.IsCanceled; } }
public bool IsFaulted { get { return Task.IsFaulted; } }
public MessageTaskCompletionSource(string message, CancellationToken token)
{
m_CancellationTokenRegistration = token.Register(() => TrySetCanceled());
Message = message;
Token = token;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected void Dispose(bool disposing)
{
if (m_Disposed) return;
if (disposing)
{
TrySetException(new ObjectDisposedException(GetType().Name));
if (m_CancellationTokenRegistration != null)
{
var tmp = m_CancellationTokenRegistration;
m_CancellationTokenRegistration = null;
tmp.Dispose();
}
}
m_Disposed = true;
}
}
#endregion
}
目前我对这个解决方案没问题;它完成了工作 :D
我想使用异步 producer/consumer 队列(AsyncEx 库)通过总线一次发送一条消息。现在我只是通过异步阻塞来实现这一点。它工作正常,但我无法控制队列:(
所以我想到了以下解决方案,问题是取消的任务不会从队列中删除。如果我将队列限制为 10(因为每条消息需要 1 秒发送,最大队列时间应为 10 秒左右)并且队列中已经包含 8 个等待任务和 2 个已取消任务,那么下一个排队任务将抛出 InvalidOperationException,尽管无论如何都不会发送两个已取消的任务。
也许有更好的方法来做到这一点 :D
class Program
{
static AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>> s_Queue =
new AsyncProducerConsumerQueue<Tuple<string, TaskCompletionSource>>();
static void Main()
{
StartAsync().Wait();
}
static async Task StartAsync()
{
var sendingTask = StartSendingAsync();
var tasks = new List<Task>();
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(8)))
{
for (var i = 0; i < 10; i++)
{
tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token));
}
try
{
await Task.WhenAll(tasks);
Console.WriteLine("All messages sent.");
}
catch (TaskCanceledException)
{
Console.WriteLine("At least one task was canceled.");
}
}
s_Queue.CompleteAdding();
await sendingTask;
s_Queue.Dispose();
Console.WriteLine("Queue completed.");
Console.WriteLine("Press any key to continue...");
Console.ReadKey();
}
static async Task EnqueueMessageAsync(string message, CancellationToken token)
{
var tcs = new TaskCompletionSource();
using (token.Register(() => tcs.TrySetCanceled()))
{
await s_Queue.EnqueueAsync(new Tuple<string, TaskCompletionSource>(message, tcs));
Console.WriteLine("Thread '{0}' - {1}: {2} queued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message);
await tcs.Task;
}
}
static async Task SendMessageAsync(string message)
{
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine("Thread '{0}' - {1}: {2} sent.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message);
}
static async Task StartSendingAsync()
{
while (await s_Queue.OutputAvailableAsync())
{
var t = await s_Queue.DequeueAsync();
if (t.Item2.Task.IsCanceled || t.Item2.Task.IsFaulted) continue;
await SendMessageAsync(t.Item1);
t.Item2.TrySetResult();
}
}
}
编辑 1:
正如 svik 指出的那样,仅当队列已经完成时才会抛出 InvalidOperationException。所以这个解决方案甚至没有解决我最初的非托管 "queue" 等待任务问题。如果有例如超过 10 个 calls/10s 我得到了一个完整的队列和一个额外的非托管 "queue" 等待任务,就像我的异步阻塞方法 (AsyncMonitor)。我想我必须想出一些其他的解决方案......
编辑 2:
我有 N 个不同的消息生产者(我不知道有多少,因为这不是我的代码),只有一个消费者通过总线发送消息并检查它们是否正确发送(不是真正的字符串消息)。
以下代码模拟了代码应该中断的情况(队列大小为 10):
- 排队 10 条消息(超时 5 秒)
- 等待 5 秒(消息 0-4 已发送,消息 5-9 已取消)
- 排队 11 条新消息(w/o 超时)
- 消息 10 - 19 应该排队,因为队列只包含已取消的消息
- 消息 20 应该抛出异常(例如 QueueOverflowException),因为队列已满,这将由生产者代码处理或不处理
制作人:
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)))
{
for (var i = 0; i < 10; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, cts.Token)); }
await Task.Delay(TimeSpan.FromSeconds(5));
for (var i = 10; i < 21; i++) { tasks.Add(EnqueueMessageAsync("Message " + i, default(CancellationToken))); }
try
{
await Task.WhenAll(tasks);
Console.WriteLine("All messages sent.");
}
catch (TaskCanceledException)
{
Console.WriteLine("At least one task was canceled.");
Console.WriteLine("Press any key to complete queue...");
Console.ReadKey();
}
}
目标是,我想完全控制所有应该发送的消息,但是在我之前发布的代码中并不是这样,因为我只能控制队列中的消息但是不是等待入队的消息(可能有 10000 条消息异步等待入队,我不知道 => 生产者代码无论如何都不会按预期工作,因为发送所有等待的消息需要很长时间...)
我希望这能让我更清楚我想要实现的目标 ;)
我不确定回答我自己的问题是否可以,所以我不会将其标记为答案,也许有人想出更好的解决方案:P
首先是生产者代码:
static async Task StartAsync()
{
using (var queue = new SendMessageQueue(10, new SendMessageService()))
using (var timeoutTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(4.5)))
{
var tasks = new List<Task>();
for (var i = 0; i < 10; i++)
{
tasks.Add(queue.SendAsync(i.ToString(), timeoutTokenSource.Token));
}
await Task.Delay(TimeSpan.FromSeconds(4.5));
for (var i = 10; i < 25; i++)
{
tasks.Add(queue.SendAsync(i.ToString(), default(CancellationToken)));
}
await queue.CompleteSendingAsync();
for (var i = 0; i < tasks.Count; i++ )
{
try
{
await tasks[i];
Console.WriteLine("Message '{0}' send.", i);
}
catch (TaskCanceledException)
{
Console.WriteLine("Message '{0}' canceled.", i);
}
catch (QueueOverflowException ex)
{
Console.WriteLine(ex.Message);
}
}
}
Console.WriteLine("Press any key to continue...");
Console.ReadKey();
}
- 25 条消息在 5 秒内排队
- 发送了 16 条消息
- 3 条消息未发送(队列已满)
- 6 条消息被取消
这里是基于列表的 "Queue" class。它是队列和消费者的组合。同步是使用 AsyncMonitor class(Stephen Cleary 的 AsyncEx)完成的。
class SendMessageQueue : IDisposable
{
private bool m_Disposed;
private bool m_CompleteSending;
private Task m_SendingTask;
private AsyncMonitor m_Monitor;
private List<MessageTaskCompletionSource> m_MessageCollection;
private ISendMessageService m_SendMessageService;
public int Capacity { get; private set; }
public SendMessageQueue(int capacity, ISendMessageService service)
{
Capacity = capacity;
m_Monitor = new AsyncMonitor();
m_MessageCollection = new List<MessageTaskCompletionSource>();
m_SendMessageService = service;
m_SendingTask = StartSendingAsync();
}
public async Task<bool> SendAsync(string message, CancellationToken token)
{
if (m_Disposed) { throw new ObjectDisposedException(GetType().Name); }
if (message == null) { throw new ArgumentNullException("message"); }
using (var messageTcs = new MessageTaskCompletionSource(message, token))
{
await AddAsync(messageTcs);
return await messageTcs.Task;
}
}
public async Task CompleteSendingAsync()
{
if (m_Disposed) { throw new ObjectDisposedException(GetType().Name); }
using (m_Monitor.Enter())
{
m_CompleteSending = true;
}
await m_SendingTask;
}
private async Task AddAsync(MessageTaskCompletionSource message)
{
using (await m_Monitor.EnterAsync(message.Token))
{
if (m_CompleteSending) { throw new InvalidOperationException("Queue already completed."); }
if (Capacity < m_MessageCollection.Count)
{
m_MessageCollection.RemoveAll(item => item.IsCanceled);
if (Capacity < m_MessageCollection.Count)
{
throw new QueueOverflowException(string.Format("Queue overflow; '{0}' couldn't be enqueued.", message.Message));
}
}
m_MessageCollection.Add(message);
}
m_Monitor.Pulse(); // signal new message
Console.WriteLine("Thread '{0}' - {1}: '{2}' enqueued.", Thread.CurrentThread.ManagedThreadId, DateTime.Now.TimeOfDay, message.Message);
}
private async Task<MessageTaskCompletionSource> TakeAsync()
{
using (await m_Monitor.EnterAsync())
{
var message = m_MessageCollection.ElementAt(0);
m_MessageCollection.RemoveAt(0);
return message;
}
}
private async Task<bool> OutputAvailableAsync()
{
using (await m_Monitor.EnterAsync())
{
if (m_MessageCollection.Count > 0) { return true; }
else if (m_CompleteSending) { return false; }
await m_Monitor.WaitAsync();
return true;
}
}
private async Task StartSendingAsync()
{
while (await OutputAvailableAsync())
{
var message = await TakeAsync();
if (message.IsCanceled) continue;
try
{
var result = await m_SendMessageService.SendMessageAsync(message.Message, message.Token);
message.TrySetResult(result);
}
catch (TaskCanceledException) { message.TrySetCanceled(); }
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected void Dispose(bool disposing)
{
if (m_Disposed) return;
if (disposing)
{
if (m_MessageCollection != null)
{
var tmp = m_MessageCollection;
m_MessageCollection = null;
tmp.ForEach(item => item.Dispose());
tmp.Clear();
}
}
m_Disposed = true;
}
#region MessageTaskCompletionSource Class
class MessageTaskCompletionSource : TaskCompletionSource<bool>, IDisposable
{
private bool m_Disposed;
private IDisposable m_CancellationTokenRegistration;
public string Message { get; private set; }
public CancellationToken Token { get; private set; }
public bool IsCompleted { get { return Task.IsCompleted; } }
public bool IsCanceled { get { return Task.IsCanceled; } }
public bool IsFaulted { get { return Task.IsFaulted; } }
public MessageTaskCompletionSource(string message, CancellationToken token)
{
m_CancellationTokenRegistration = token.Register(() => TrySetCanceled());
Message = message;
Token = token;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected void Dispose(bool disposing)
{
if (m_Disposed) return;
if (disposing)
{
TrySetException(new ObjectDisposedException(GetType().Name));
if (m_CancellationTokenRegistration != null)
{
var tmp = m_CancellationTokenRegistration;
m_CancellationTokenRegistration = null;
tmp.Dispose();
}
}
m_Disposed = true;
}
}
#endregion
}
目前我对这个解决方案没问题;它完成了工作 :D