多线程异步模式
Multithreaded async pattern
我有一个场景,其中多个线程通过单个套接字发送数据。已将唯一 ID 插入到消息中,并在响应消息中回显该唯一 ID。当套接字与单个客户端隔离时(正如预期的那样),一切都很好。现在我正在为客户端等待特定响应的多个线程寻找异步/等待模式。
一些代码来演示:
using (var ns = new NetworkStream(_socket))
{
byte[] requestBuffer = GetBuffer(request);
await ns.WriteAsync(requestBuffer, 0, request.Length);
byte[] responseBuffer = await ReceiveMessageAsync(ns);
// process response message
}
以上示例不适用于多线程场景,因为消息可以按任何顺序返回,因此下一条消息可能不属于当前客户端。我的想法是,客户端将使用其唯一 ID(将其存储在字典中)注册委托或任务,当消息返回时带有该唯一 ID,委托或任务将是 'completed' 和响应字节。我猜这会很容易用 EventHandler 实现,但我正在寻找一种等待响应的方法。
例如:
using (var ns = new CustomNetworkStream(_socket))
{
Task waitTask = ns.RegisterTask(this.UniqueId);
byte[] requestBuffer = GetBuffer(request);
await ns.WriteAsync(requestBuffer, 0, request.Length);
byte[] responseBuffer = await waitTask;
// process response message
}
我不知道"RegisterTask"这个方法会是什么样子,如何存储任务,如何使任务'wait'后来'complete'以Task为结果。
有什么想法吗?我研究了 Toub 的 Async Coordination Primitives 系列,但我不确定这是否是正确的方法。
您需要一个 TaskCompletionSource<byte[]>
用作同步构造,一个 ConcurrentDictionary
在 id 和 TCS 与侦听器之间映射:
ConcurrentDictionary<UniqueId, TaskCompletionSource<byte[]>> _dictionary;
async Task Listen(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
using (var ns = new NetworkStream(_socket))
{
byte[] responseBuffer = await ReceiveMessageAsync(ns);
var id = ExtractId(responseBuffer);
TaskCompletionSource<byte[]> tcs;
if (dictionary.TryRemove(id, out tcs))
{
tcs.SetResult(responseBuffer);
}
else
{
// error
}
}
}
}
Task RegisterTask(UniqueId id)
{
var tcs = new TaskCompletionSource<byte[]>();
if (!_dictionary.TryAdd(id, tcs))
{
// error
}
return tcs.Task;
}
但是,正如 Stephen Cleary 所建议的,您可能希望为此使用现有的解决方案。
因此您需要将所有这些包装到一个新的 class 中,因为您将需要在您阅读的地方和您写的地方之间共享状态。
每次写入流时,您都需要接受唯一 ID,并在将 ID 映射到 TaskCompletionSource
的查找中添加一个条目。然后,写入方法可以 return 来自该 TCS 的 Task
。
然后您可以有一个单独的 reader,它只是坐在那里从您的流中读取数据,找到与该响应的 ID 关联的字典条目,并设置其结果。
public class MyNetworkStream : IDisposable
{
private NetworkStream stream;
private ConcurrentDictionary<int, TaskCompletionSource<byte[]>> lookup =
new ConcurrentDictionary<int, TaskCompletionSource<byte[]>>();
private CancellationTokenSource disposalCTS = new CancellationTokenSource();
public MyNetworkStream(Socket socket)
{
stream = new NetworkStream(socket);
KeepReading();
}
public void Dispose()
{
disposalCTS.Cancel();
stream.Dispose();
}
public Task<byte[]> WriteAndWaitAsync(byte[] buffer, int offset,
int count, int uniqueID)
{
var tcs = lookup.GetOrAdd(uniqueID, new TaskCompletionSource<byte[]>());
stream.WriteAsync(buffer, offset, count);
return tcs.Task;
}
private async void KeepReading()
{
try
{
//TODO figure out what you want for a buffer size so that you can
//read a block of the appropriate size.
byte[] buffer = null;
while (!disposalCTS.IsCancellationRequested)
{
//TODO edit offset and count appropriately
await stream.ReadAsync(buffer, 0, 0, disposalCTS.Token);
int id = GetUniqueIdFromBlock(buffer);
TaskCompletionSource<byte[]> tcs;
if (lookup.TryRemove(id, out tcs))
tcs.TrySetResult(buffer);
else
{
//TODO figure out what to do here
}
}
}
catch (Exception e)
{
foreach (var tcs in lookup.Values)
tcs.TrySetException(e);
Dispose();
//TODO consider any other necessary cleanup
}
}
private int GetUniqueIdFromBlock(byte[] buffer)
{
throw new NotImplementedException();
}
}
我有一个场景,其中多个线程通过单个套接字发送数据。已将唯一 ID 插入到消息中,并在响应消息中回显该唯一 ID。当套接字与单个客户端隔离时(正如预期的那样),一切都很好。现在我正在为客户端等待特定响应的多个线程寻找异步/等待模式。
一些代码来演示:
using (var ns = new NetworkStream(_socket))
{
byte[] requestBuffer = GetBuffer(request);
await ns.WriteAsync(requestBuffer, 0, request.Length);
byte[] responseBuffer = await ReceiveMessageAsync(ns);
// process response message
}
以上示例不适用于多线程场景,因为消息可以按任何顺序返回,因此下一条消息可能不属于当前客户端。我的想法是,客户端将使用其唯一 ID(将其存储在字典中)注册委托或任务,当消息返回时带有该唯一 ID,委托或任务将是 'completed' 和响应字节。我猜这会很容易用 EventHandler 实现,但我正在寻找一种等待响应的方法。
例如:
using (var ns = new CustomNetworkStream(_socket))
{
Task waitTask = ns.RegisterTask(this.UniqueId);
byte[] requestBuffer = GetBuffer(request);
await ns.WriteAsync(requestBuffer, 0, request.Length);
byte[] responseBuffer = await waitTask;
// process response message
}
我不知道"RegisterTask"这个方法会是什么样子,如何存储任务,如何使任务'wait'后来'complete'以Task为结果。
有什么想法吗?我研究了 Toub 的 Async Coordination Primitives 系列,但我不确定这是否是正确的方法。
您需要一个 TaskCompletionSource<byte[]>
用作同步构造,一个 ConcurrentDictionary
在 id 和 TCS 与侦听器之间映射:
ConcurrentDictionary<UniqueId, TaskCompletionSource<byte[]>> _dictionary;
async Task Listen(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
using (var ns = new NetworkStream(_socket))
{
byte[] responseBuffer = await ReceiveMessageAsync(ns);
var id = ExtractId(responseBuffer);
TaskCompletionSource<byte[]> tcs;
if (dictionary.TryRemove(id, out tcs))
{
tcs.SetResult(responseBuffer);
}
else
{
// error
}
}
}
}
Task RegisterTask(UniqueId id)
{
var tcs = new TaskCompletionSource<byte[]>();
if (!_dictionary.TryAdd(id, tcs))
{
// error
}
return tcs.Task;
}
但是,正如 Stephen Cleary 所建议的,您可能希望为此使用现有的解决方案。
因此您需要将所有这些包装到一个新的 class 中,因为您将需要在您阅读的地方和您写的地方之间共享状态。
每次写入流时,您都需要接受唯一 ID,并在将 ID 映射到 TaskCompletionSource
的查找中添加一个条目。然后,写入方法可以 return 来自该 TCS 的 Task
。
然后您可以有一个单独的 reader,它只是坐在那里从您的流中读取数据,找到与该响应的 ID 关联的字典条目,并设置其结果。
public class MyNetworkStream : IDisposable
{
private NetworkStream stream;
private ConcurrentDictionary<int, TaskCompletionSource<byte[]>> lookup =
new ConcurrentDictionary<int, TaskCompletionSource<byte[]>>();
private CancellationTokenSource disposalCTS = new CancellationTokenSource();
public MyNetworkStream(Socket socket)
{
stream = new NetworkStream(socket);
KeepReading();
}
public void Dispose()
{
disposalCTS.Cancel();
stream.Dispose();
}
public Task<byte[]> WriteAndWaitAsync(byte[] buffer, int offset,
int count, int uniqueID)
{
var tcs = lookup.GetOrAdd(uniqueID, new TaskCompletionSource<byte[]>());
stream.WriteAsync(buffer, offset, count);
return tcs.Task;
}
private async void KeepReading()
{
try
{
//TODO figure out what you want for a buffer size so that you can
//read a block of the appropriate size.
byte[] buffer = null;
while (!disposalCTS.IsCancellationRequested)
{
//TODO edit offset and count appropriately
await stream.ReadAsync(buffer, 0, 0, disposalCTS.Token);
int id = GetUniqueIdFromBlock(buffer);
TaskCompletionSource<byte[]> tcs;
if (lookup.TryRemove(id, out tcs))
tcs.TrySetResult(buffer);
else
{
//TODO figure out what to do here
}
}
}
catch (Exception e)
{
foreach (var tcs in lookup.Values)
tcs.TrySetException(e);
Dispose();
//TODO consider any other necessary cleanup
}
}
private int GetUniqueIdFromBlock(byte[] buffer)
{
throw new NotImplementedException();
}
}