异步使用 NamedPipeServerStream 和 NamedPipeClientStream
Using NamedPipeServerStream and NamedPipeClientStream asynchronously
我对 server/client 架构有以下要求:
写一个异步工作的server/client。
通信需要双工,即两端读写。
多个客户端可以在任何给定时间连接到服务器。
Server/client 应该等到它们可用并最终建立连接。
客户端连接后,它应该写入流。
然后服务器应该从流中读取并将响应写回客户端。
最后,客户端应该读取响应,通信应该结束。
因此,考虑到以下要求,我编写了以下代码,但我不太确定,因为管道文档有些缺乏,不幸的是,代码似乎无法正常工作,它挂在某个点。
namespace PipesAsyncAwait471
{
using System;
using System.Collections.Generic;
using System.IO.Pipes;
using System.Linq;
using System.Threading.Tasks;
internal class Program
{
private static async Task Main()
{
List<Task> tasks = new List<Task> {
HandleRequestAsync(),
};
tasks.AddRange(Enumerable.Range(0, 10).Select(i => SendRequestAsync(i, 0, 5)));
await Task.WhenAll(tasks);
}
private static async Task HandleRequestAsync()
{
using (NamedPipeServerStream server = new NamedPipeServerStream("MyPipe",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous))
{
Console.WriteLine("Waiting...");
await server.WaitForConnectionAsync().ConfigureAwait(false);
if (server.IsConnected)
{
Console.WriteLine("Connected");
if (server.CanRead) {
// Read something...
}
if (server.CanWrite) {
// Write something...
await server.FlushAsync().ConfigureAwait(false);
server.WaitForPipeDrain();
}
server.Disconnect();
await HandleRequestAsync().ConfigureAwait(false);
}
}
}
private static async Task SendRequestAsync(int index, int counter, int max)
{
using (NamedPipeClientStream client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.Asynchronous))
{
await client.ConnectAsync().ConfigureAwait(false);
if (client.IsConnected)
{
Console.WriteLine($"Index: {index} Counter: {counter}");
if (client.CanWrite) {
// Write something...
await client.FlushAsync().ConfigureAwait(false);
client.WaitForPipeDrain();
}
if (client.CanRead) {
// Read something...
}
}
if (counter <= max) {
await SendRequestAsync(index, ++counter, max).ConfigureAwait(false);
}
else {
Console.WriteLine($"{index} Done!");
}
}
}
}
}
假设:
我期望它工作的方式是当我调用 SendRequestAsync
时发出的所有请求同时执行,然后每个请求发出额外的请求直到它到达 6
最后,它应该打印 "Done!".
备注:
我在 .NET Framework 4.7.1 和 .NET Core 2.0 上进行了测试,结果相同。
客户端和服务器之间的通信总是在本地机器上,其中客户端是网络应用程序,可以将一些作业排队,例如启动 3rd 方进程,服务器将作为 Windows 服务部署在与部署这些客户端的 Web 服务器相同的机器上。
断开连接时,WaitForPipeDrain()
可能会因管道损坏而抛出 IOException
。
如果您的服务器 Task
发生这种情况,那么它将永远不会侦听下一个连接,并且所有剩余的客户端连接都会挂起 ConnectAsync()
。
如果在其中一个客户端任务中发生这种情况,则它将不会继续递归并增加该索引的计数器。
如果您将对 WaitForPipeDrain()
的调用包装在 try
/catch
中,程序将永远 运行 继续,因为您的函数 HandleRequestAsync()
是无限递归。
简而言之,要使其正常工作:
- 从
WaitForPipeDrain()
处理 IOException
HandleRequestAsync()
必须在某个时候完成。
这是经过一些迭代后的完整代码:
namespace PipesAsyncAwait471
{
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Threading.Tasks;
internal class Program
{
private const int MAX_REQUESTS = 1000;
private static void Main()
{
var tasks = new List<Task> {
//Task.Run(() => HandleRequest(0))
HandleRequestAsync(0)
};
tasks.AddRange(Enumerable.Range(0, MAX_REQUESTS).Select(i => Task.Factory.StartNew(() => SendRequest(i), TaskCreationOptions.LongRunning)));
Task.WhenAll(tasks);
Console.ReadKey();
}
private static void HandleRequest(int counter)
{
try {
var server = new NamedPipeServerStream("MyPipe",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous);
Console.WriteLine($"Waiting a client... {counter}");
server.BeginWaitForConnection(WaitForConnectionCallback, server);
}
catch (Exception ex) {
Console.WriteLine(ex);
}
void WaitForConnectionCallback(IAsyncResult result)
{
var server = (NamedPipeServerStream)result.AsyncState;
int index = -1;
try {
server.EndWaitForConnection(result);
HandleRequest(++counter);
if (server.IsConnected) {
var request = new byte[4];
server.BeginRead(request, 0, request.Length, ReadCallback, server);
index = BitConverter.ToInt32(request, 0);
Console.WriteLine($"{index} Request.");
var response = BitConverter.GetBytes(index);
server.BeginWrite(response, 0, response.Length, WriteCallback, server);
server.Flush();
server.WaitForPipeDrain();
Console.WriteLine($"{index} Pong.");
server.Disconnect();
Console.WriteLine($"{index} Disconnected.");
}
}
catch (IOException ex) {
Console.WriteLine($"{index}\n\t{ex}");
}
finally {
server.Dispose();
}
}
void ReadCallback(IAsyncResult result)
{
var server = (NamedPipeServerStream)result.AsyncState;
try {
server.EndRead(result);
}
catch (IOException ex) {
Console.WriteLine(ex);
}
}
void WriteCallback(IAsyncResult result)
{
var server = (NamedPipeServerStream)result.AsyncState;
try {
server.EndWrite(result);
}
catch (IOException ex) {
Console.WriteLine(ex);
}
}
}
private static async Task HandleRequestAsync(int counter)
{
NamedPipeServerStream server = null;
int index = -1;
try {
server = new NamedPipeServerStream("MyPipe",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous);
Console.WriteLine($"Waiting a client... {counter}");
await server.WaitForConnectionAsync()
.ContinueWith(async t => await HandleRequestAsync(++counter).ConfigureAwait(false))
.ConfigureAwait(false);
if (server.IsConnected) {
var request = new byte[4];
await server.ReadAsync(request, 0, request.Length).ConfigureAwait(false);
index = BitConverter.ToInt32(request, 0);
Console.WriteLine($"{index} Request.");
var response = BitConverter.GetBytes(index);
await server.WriteAsync(response, 0, response.Length).ConfigureAwait(false);
await server.FlushAsync().ConfigureAwait(false);
server.WaitForPipeDrain();
Console.WriteLine($"{index} Pong.");
server.Disconnect();
Console.WriteLine($"{index} Disconnected.");
}
}
catch (IOException ex) {
Console.WriteLine($"{index}\n\t{ex}");
}
finally {
server?.Dispose();
}
}
private static void SendRequest(int index)
{
NamedPipeClientStream client = null;
try {
client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.None);
client.Connect();
var request = BitConverter.GetBytes(index);
client.Write(request, 0, request.Length);
client.Flush();
client.WaitForPipeDrain();
Console.WriteLine($"{index} Ping.");
var response = new byte[4];
client.Read(response, 0, response.Length);
index = BitConverter.ToInt32(response, 0);
Console.WriteLine($"{index} Response.");
}
catch (Exception ex) {
Console.WriteLine($"{index}\n\t{ex}");
}
finally {
client?.Dispose();
}
}
}
}
您可以对消息进行排序并观察以下内容:
正确打开和关闭连接。
数据收发正确
最后,服务器还在等待进一步的连接。
更新:
已将 PipeOptions.Asynchronous
更改为 PipeOptions.None
否则它似乎会在请求期间挂起,然后才立即处理它们。
PipeOptions.Asynchronous 只是导致与 PipeOptions.None 不同的执行顺序,这会在您的代码中暴露竞争条件/死锁。如果您使用任务管理器,您可以看到它的效果,例如,监视您的进程的线程数......您应该看到它以每秒大约 1 个线程的速度上升,直到它达到大约 100 个线程(可能 110 左右),此时您的代码 运行s 完成。或者,如果您在开头添加 ThreadPool.SetMinThreads(200, 200)。您的代码有一个问题,如果发生错误的排序(使用异步更有可能发生这种情况),您将创建一个循环,直到有足够的线程来 运行 所有并发的 ConnectAsyncs 你的main 方法已经排队,这并不是真正的异步,而只是创建一个工作项来调用同步 Connect 方法(这很不幸,像这样的问题是我敦促人们不要暴露异步 API 的原因之一队列工作项目调用同步方法)。 Source.
修改并简化了示例:
管道没有真正的异步 Connect
方法,ConnectAsync
在幕后使用 Task.Factory.StartNew
所以你也可以使用 Connect
和然后将调用同步 Connect
版本的方法(在我们的示例中为 SendRequest
)传递给 Task.Factory.StartNew
.
服务器现在是完全异步的,据我所知它可以正常工作。
为服务器添加了两个实现,一个使用回调,另一个利用 async/await 功能,只是因为我找不到这两个的好例子。
希望对您有所帮助。
我对 server/client 架构有以下要求:
写一个异步工作的server/client。
通信需要双工,即两端读写。
多个客户端可以在任何给定时间连接到服务器。
Server/client 应该等到它们可用并最终建立连接。
客户端连接后,它应该写入流。
然后服务器应该从流中读取并将响应写回客户端。
最后,客户端应该读取响应,通信应该结束。
因此,考虑到以下要求,我编写了以下代码,但我不太确定,因为管道文档有些缺乏,不幸的是,代码似乎无法正常工作,它挂在某个点。
namespace PipesAsyncAwait471
{
using System;
using System.Collections.Generic;
using System.IO.Pipes;
using System.Linq;
using System.Threading.Tasks;
internal class Program
{
private static async Task Main()
{
List<Task> tasks = new List<Task> {
HandleRequestAsync(),
};
tasks.AddRange(Enumerable.Range(0, 10).Select(i => SendRequestAsync(i, 0, 5)));
await Task.WhenAll(tasks);
}
private static async Task HandleRequestAsync()
{
using (NamedPipeServerStream server = new NamedPipeServerStream("MyPipe",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous))
{
Console.WriteLine("Waiting...");
await server.WaitForConnectionAsync().ConfigureAwait(false);
if (server.IsConnected)
{
Console.WriteLine("Connected");
if (server.CanRead) {
// Read something...
}
if (server.CanWrite) {
// Write something...
await server.FlushAsync().ConfigureAwait(false);
server.WaitForPipeDrain();
}
server.Disconnect();
await HandleRequestAsync().ConfigureAwait(false);
}
}
}
private static async Task SendRequestAsync(int index, int counter, int max)
{
using (NamedPipeClientStream client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.Asynchronous))
{
await client.ConnectAsync().ConfigureAwait(false);
if (client.IsConnected)
{
Console.WriteLine($"Index: {index} Counter: {counter}");
if (client.CanWrite) {
// Write something...
await client.FlushAsync().ConfigureAwait(false);
client.WaitForPipeDrain();
}
if (client.CanRead) {
// Read something...
}
}
if (counter <= max) {
await SendRequestAsync(index, ++counter, max).ConfigureAwait(false);
}
else {
Console.WriteLine($"{index} Done!");
}
}
}
}
}
假设:
我期望它工作的方式是当我调用 SendRequestAsync
时发出的所有请求同时执行,然后每个请求发出额外的请求直到它到达 6
最后,它应该打印 "Done!".
备注:
我在 .NET Framework 4.7.1 和 .NET Core 2.0 上进行了测试,结果相同。
客户端和服务器之间的通信总是在本地机器上,其中客户端是网络应用程序,可以将一些作业排队,例如启动 3rd 方进程,服务器将作为 Windows 服务部署在与部署这些客户端的 Web 服务器相同的机器上。
断开连接时,WaitForPipeDrain()
可能会因管道损坏而抛出 IOException
。
如果您的服务器 Task
发生这种情况,那么它将永远不会侦听下一个连接,并且所有剩余的客户端连接都会挂起 ConnectAsync()
。
如果在其中一个客户端任务中发生这种情况,则它将不会继续递归并增加该索引的计数器。
如果您将对 WaitForPipeDrain()
的调用包装在 try
/catch
中,程序将永远 运行 继续,因为您的函数 HandleRequestAsync()
是无限递归。
简而言之,要使其正常工作:
- 从
WaitForPipeDrain()
处理 HandleRequestAsync()
必须在某个时候完成。
IOException
这是经过一些迭代后的完整代码:
namespace PipesAsyncAwait471
{
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Threading.Tasks;
internal class Program
{
private const int MAX_REQUESTS = 1000;
private static void Main()
{
var tasks = new List<Task> {
//Task.Run(() => HandleRequest(0))
HandleRequestAsync(0)
};
tasks.AddRange(Enumerable.Range(0, MAX_REQUESTS).Select(i => Task.Factory.StartNew(() => SendRequest(i), TaskCreationOptions.LongRunning)));
Task.WhenAll(tasks);
Console.ReadKey();
}
private static void HandleRequest(int counter)
{
try {
var server = new NamedPipeServerStream("MyPipe",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous);
Console.WriteLine($"Waiting a client... {counter}");
server.BeginWaitForConnection(WaitForConnectionCallback, server);
}
catch (Exception ex) {
Console.WriteLine(ex);
}
void WaitForConnectionCallback(IAsyncResult result)
{
var server = (NamedPipeServerStream)result.AsyncState;
int index = -1;
try {
server.EndWaitForConnection(result);
HandleRequest(++counter);
if (server.IsConnected) {
var request = new byte[4];
server.BeginRead(request, 0, request.Length, ReadCallback, server);
index = BitConverter.ToInt32(request, 0);
Console.WriteLine($"{index} Request.");
var response = BitConverter.GetBytes(index);
server.BeginWrite(response, 0, response.Length, WriteCallback, server);
server.Flush();
server.WaitForPipeDrain();
Console.WriteLine($"{index} Pong.");
server.Disconnect();
Console.WriteLine($"{index} Disconnected.");
}
}
catch (IOException ex) {
Console.WriteLine($"{index}\n\t{ex}");
}
finally {
server.Dispose();
}
}
void ReadCallback(IAsyncResult result)
{
var server = (NamedPipeServerStream)result.AsyncState;
try {
server.EndRead(result);
}
catch (IOException ex) {
Console.WriteLine(ex);
}
}
void WriteCallback(IAsyncResult result)
{
var server = (NamedPipeServerStream)result.AsyncState;
try {
server.EndWrite(result);
}
catch (IOException ex) {
Console.WriteLine(ex);
}
}
}
private static async Task HandleRequestAsync(int counter)
{
NamedPipeServerStream server = null;
int index = -1;
try {
server = new NamedPipeServerStream("MyPipe",
PipeDirection.InOut,
NamedPipeServerStream.MaxAllowedServerInstances,
PipeTransmissionMode.Message,
PipeOptions.Asynchronous);
Console.WriteLine($"Waiting a client... {counter}");
await server.WaitForConnectionAsync()
.ContinueWith(async t => await HandleRequestAsync(++counter).ConfigureAwait(false))
.ConfigureAwait(false);
if (server.IsConnected) {
var request = new byte[4];
await server.ReadAsync(request, 0, request.Length).ConfigureAwait(false);
index = BitConverter.ToInt32(request, 0);
Console.WriteLine($"{index} Request.");
var response = BitConverter.GetBytes(index);
await server.WriteAsync(response, 0, response.Length).ConfigureAwait(false);
await server.FlushAsync().ConfigureAwait(false);
server.WaitForPipeDrain();
Console.WriteLine($"{index} Pong.");
server.Disconnect();
Console.WriteLine($"{index} Disconnected.");
}
}
catch (IOException ex) {
Console.WriteLine($"{index}\n\t{ex}");
}
finally {
server?.Dispose();
}
}
private static void SendRequest(int index)
{
NamedPipeClientStream client = null;
try {
client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.None);
client.Connect();
var request = BitConverter.GetBytes(index);
client.Write(request, 0, request.Length);
client.Flush();
client.WaitForPipeDrain();
Console.WriteLine($"{index} Ping.");
var response = new byte[4];
client.Read(response, 0, response.Length);
index = BitConverter.ToInt32(response, 0);
Console.WriteLine($"{index} Response.");
}
catch (Exception ex) {
Console.WriteLine($"{index}\n\t{ex}");
}
finally {
client?.Dispose();
}
}
}
}
您可以对消息进行排序并观察以下内容:
正确打开和关闭连接。
数据收发正确
最后,服务器还在等待进一步的连接。
更新:
已将PipeOptions.Asynchronous
更改为PipeOptions.None
否则它似乎会在请求期间挂起,然后才立即处理它们。PipeOptions.Asynchronous 只是导致与 PipeOptions.None 不同的执行顺序,这会在您的代码中暴露竞争条件/死锁。如果您使用任务管理器,您可以看到它的效果,例如,监视您的进程的线程数......您应该看到它以每秒大约 1 个线程的速度上升,直到它达到大约 100 个线程(可能 110 左右),此时您的代码 运行s 完成。或者,如果您在开头添加 ThreadPool.SetMinThreads(200, 200)。您的代码有一个问题,如果发生错误的排序(使用异步更有可能发生这种情况),您将创建一个循环,直到有足够的线程来 运行 所有并发的 ConnectAsyncs 你的main 方法已经排队,这并不是真正的异步,而只是创建一个工作项来调用同步 Connect 方法(这很不幸,像这样的问题是我敦促人们不要暴露异步 API 的原因之一队列工作项目调用同步方法)。 Source.
修改并简化了示例:
管道没有真正的异步
Connect
方法,ConnectAsync
在幕后使用Task.Factory.StartNew
所以你也可以使用Connect
和然后将调用同步Connect
版本的方法(在我们的示例中为SendRequest
)传递给Task.Factory.StartNew
.服务器现在是完全异步的,据我所知它可以正常工作。
为服务器添加了两个实现,一个使用回调,另一个利用 async/await 功能,只是因为我找不到这两个的好例子。
希望对您有所帮助。