多线程异步模式

Multithreaded async pattern

我有一个场景,其中多个线程通过单个套接字发送数据。已将唯一 ID 插入到消息中,并在响应消息中回显该唯一 ID。当套接字与单个客户端隔离时(正如预期的那样),一切都很好。现在我正在为客户端等待特定响应的多个线程寻找异步/等待模式。

一些代码来演示:

using (var ns = new NetworkStream(_socket))
{
    byte[] requestBuffer = GetBuffer(request);
    await ns.WriteAsync(requestBuffer, 0, request.Length);
    byte[] responseBuffer = await ReceiveMessageAsync(ns);

    // process response message
}

以上示例不适用于多线程场景,因为消息可以按任何顺序返回,因此下一条消息可能不属于当前客户端。我的想法是,客户端将使用其唯一 ID(将其存储在字典中)注册委托或任务,当消息返回时带有该唯一 ID,委托或任务将是 'completed' 和响应字节。我猜这会很容易用 EventHandler 实现,但我正在寻找一种等待响应的方法。

例如:

using (var ns = new CustomNetworkStream(_socket))
{
    Task waitTask = ns.RegisterTask(this.UniqueId);

    byte[] requestBuffer = GetBuffer(request);
    await ns.WriteAsync(requestBuffer, 0, request.Length);

    byte[] responseBuffer = await waitTask;

    // process response message
}

我不知道"RegisterTask"这个方法会是什么样子,如何存储任务,如何使任务'wait'后来'complete'以Task为结果。

有什么想法吗?我研究了 Toub 的 Async Coordination Primitives 系列,但我不确定这是否是正确的方法。

您需要一个 TaskCompletionSource<byte[]> 用作同步构造,一个 ConcurrentDictionary 在 id 和 TCS 与侦听器之间映射:

ConcurrentDictionary<UniqueId, TaskCompletionSource<byte[]>> _dictionary;
async Task Listen(CancellationToken token)
{
    while (!token.IsCancellationRequested)
    {
        using (var ns = new NetworkStream(_socket))
        {
            byte[] responseBuffer = await ReceiveMessageAsync(ns);
            var id = ExtractId(responseBuffer);
            TaskCompletionSource<byte[]> tcs;
            if (dictionary.TryRemove(id, out tcs))
            {
                tcs.SetResult(responseBuffer);
            }
            else
            {
                // error
            }
        }
    }
}

Task RegisterTask(UniqueId id)
{
    var tcs = new TaskCompletionSource<byte[]>();
    if (!_dictionary.TryAdd(id, tcs))
    {
        // error
    }
    return tcs.Task;
}

但是,正如 Stephen Cleary 所建议的,您可能希望为此使用现有的解决方案。

因此您需要将所有这些包装到一个新的 class 中,因为您将需要在您阅读的地方和您写的地方之间共享状态。

每次写入流时,您都需要接受唯一 ID,并在将 ID 映射到 TaskCompletionSource 的查找中添加一个条目。然后,写入方法可以 return 来自该 TCS 的 Task

然后您可以有一个单独的 reader,它只是坐在那里从您的流中读取数据,找到与该响应的 ID 关联的字典条目,并设置其结果。

public class MyNetworkStream : IDisposable
{
    private NetworkStream stream;
    private ConcurrentDictionary<int, TaskCompletionSource<byte[]>> lookup =
        new ConcurrentDictionary<int, TaskCompletionSource<byte[]>>();
    private CancellationTokenSource disposalCTS = new CancellationTokenSource();
    public MyNetworkStream(Socket socket)
    {
        stream = new NetworkStream(socket);
        KeepReading();
    }
    public void Dispose()
    {
        disposalCTS.Cancel();
        stream.Dispose();
    }

    public Task<byte[]> WriteAndWaitAsync(byte[] buffer, int offset, 
        int count, int uniqueID)
    {
        var tcs = lookup.GetOrAdd(uniqueID, new TaskCompletionSource<byte[]>());
        stream.WriteAsync(buffer, offset, count);
        return tcs.Task;
    }

    private async void KeepReading()
    {
        try
        {
            //TODO figure out what you want for a buffer size so that you can 
            //read a block of the appropriate size.
            byte[] buffer = null;
            while (!disposalCTS.IsCancellationRequested)
            {
                //TODO edit offset and count appropriately 
                await stream.ReadAsync(buffer, 0, 0, disposalCTS.Token);
                int id = GetUniqueIdFromBlock(buffer);
                TaskCompletionSource<byte[]> tcs;
                if (lookup.TryRemove(id, out tcs))
                    tcs.TrySetResult(buffer);
                else
                {
                    //TODO figure out what to do here
                }
            }
        }
        catch (Exception e)
        {
            foreach (var tcs in lookup.Values)
                tcs.TrySetException(e);
            Dispose();
            //TODO consider any other necessary cleanup
        }
    }

    private int GetUniqueIdFromBlock(byte[] buffer)
    {
        throw new NotImplementedException();
    }
}