如何使用 Kestrel ConnectionHandler 处理传入的 TCP 消息?
How to handle incoming TCP messages with a Kestrel ConnectionHandler?
我想为我的 .NET Core 项目创建一个 TCP 侦听器。我正在使用 Kestrel 并通过
为此配置了一个新的 ConnectionHandler
kestrelServerOptions.ListenLocalhost(5000, builder =>
{
builder.UseConnectionHandler<MyTCPConnectionHandler>();
});
所以到目前为止我所拥有的是
internal class MyTCPConnectionHandler : ConnectionHandler
{
public override async Task OnConnectedAsync(ConnectionContext connection)
{
IDuplexPipe pipe = connection.Transport;
PipeReader pipeReader = pipe.Input;
while (true)
{
ReadResult readResult = await pipeReader.ReadAsync();
ReadOnlySequence<byte> readResultBuffer = readResult.Buffer;
foreach (ReadOnlyMemory<byte> segment in readResultBuffer)
{
// read the current message
string messageSegment = Encoding.UTF8.GetString(segment.Span);
// send back an echo
await pipe.Output.WriteAsync(segment);
}
if (readResult.IsCompleted)
{
break;
}
pipeReader.AdvanceTo(readResultBuffer.Start, readResultBuffer.End);
}
}
}
当从 TCP 客户端向服务器应用程序发送消息时,代码工作正常。 await pipe.Output.WriteAsync(segment);
行现在就像一个回声。
出现一些问题
- 我应该向客户端发回什么样的响应才不会 运行 超时?
- 我应该什么时候发回回复?当
readResult.IsCompleted
returns 为真时?
- 我应该如何更改代码以获取客户端发送的整个消息?我是否应该将每个
messageSegment
存储在 List<string>
中,并在 readResult.IsCompleted
returns 为真时将其连接到单个字符串?
- 这完全取决于协议;在许多情况下,您什么都不做就可以了;在其他情况下,如果您只想说“我还在这里”,则会发送特定的“ping”/“pong”帧
- “何时”完全取决于协议; waiting for
readResult.IsCompleted
表示您正在等待入站套接字被标记为已关闭,这意味着在客户端关闭其出站套接字之前您不会发送任何内容;对于 single-shot 协议,这可能没问题;但在大多数情况下,您会希望查找单个入站帧,然后回复该帧(并重复)
- 听起来你可能确实在写一个 one-shot 通道,即客户端只向服务器发送一件事,然后:服务器只向客户端发送一件事; 在那种情况下,你可以这样做:
while (true)
{
var readResult = await pipeReader.ReadAsync();
if (readResult.IsCompleted)
{
// TODO: not shown; process readResult.Buffer
// tell the pipe that we consumed everything, and exit
pipeReader.AdvanceTo(readResultBuffer.End, readResultBuffer.End);
break;
}
else
{
// wait for the client to close their outbound; tell
// the pipe that we couldn't consume anything
pipeReader.AdvanceTo(readResultBuffer.Start, readResultBuffer.End);
}
至于:
Should I store each messageSegment
in a List<string>
and join it to a single string when
这里首先要考虑的是,每个缓冲区段包含的字符数不一定是准确的。由于您使用的是 UTF-8,这是一种 multi-byte 编码,一个段可能在开头和结尾包含小数字符,因此:对其进行解码要比这复杂一些。
正因为如此,通常会在缓冲区上检查IsSingleSegment
;如果这是真的,你可以使用简单的代码:
if (buffer.IsSingleSegment)
{
string message = Encoding.UTF8.GetString(s.FirstSpan);
DoSomethingWith(message);
}
else
{
// ... more complex
}
不连续缓冲区的情况要困难得多;基本上,您在这里有两个选择:
- 将段线性化为连续缓冲区,可能从
ArrayPool<byte>.Shared
租用超大缓冲区,并在租用缓冲区的 正确部分 上使用 UTF8.GetString
- 在编码上使用
GetDecoder()
API,并使用它来填充新字符串,这在旧框架中意味着覆盖新分配的字符串,或者在新框架中意味着使用 string.Create
API
坦率地说,“1”要简单得多。例如(未经测试):
public static string GetString(in this ReadOnlySequence<byte> payload,
Encoding encoding = null)
{
encoding ??= Encoding.UTF8;
return payload.IsSingleSegment ? encoding.GetString(payload.FirstSpan)
: GetStringSlow(payload, encoding);
static string GetStringSlow(in ReadOnlySequence<byte> payload, Encoding encoding)
{
// linearize
int length = checked((int)payload.Length);
var oversized = ArrayPool<byte>.Shared.Rent(length);
try
{
payload.CopyTo(oversized);
return encoding.GetString(oversized, 0, length);
}
finally
{
ArrayPool<byte>.Shared.Return(oversized);
}
}
}
我想为我的 .NET Core 项目创建一个 TCP 侦听器。我正在使用 Kestrel 并通过
为此配置了一个新的ConnectionHandler
kestrelServerOptions.ListenLocalhost(5000, builder =>
{
builder.UseConnectionHandler<MyTCPConnectionHandler>();
});
所以到目前为止我所拥有的是
internal class MyTCPConnectionHandler : ConnectionHandler
{
public override async Task OnConnectedAsync(ConnectionContext connection)
{
IDuplexPipe pipe = connection.Transport;
PipeReader pipeReader = pipe.Input;
while (true)
{
ReadResult readResult = await pipeReader.ReadAsync();
ReadOnlySequence<byte> readResultBuffer = readResult.Buffer;
foreach (ReadOnlyMemory<byte> segment in readResultBuffer)
{
// read the current message
string messageSegment = Encoding.UTF8.GetString(segment.Span);
// send back an echo
await pipe.Output.WriteAsync(segment);
}
if (readResult.IsCompleted)
{
break;
}
pipeReader.AdvanceTo(readResultBuffer.Start, readResultBuffer.End);
}
}
}
当从 TCP 客户端向服务器应用程序发送消息时,代码工作正常。 await pipe.Output.WriteAsync(segment);
行现在就像一个回声。
出现一些问题
- 我应该向客户端发回什么样的响应才不会 运行 超时?
- 我应该什么时候发回回复?当
readResult.IsCompleted
returns 为真时? - 我应该如何更改代码以获取客户端发送的整个消息?我是否应该将每个
messageSegment
存储在List<string>
中,并在readResult.IsCompleted
returns 为真时将其连接到单个字符串?
- 这完全取决于协议;在许多情况下,您什么都不做就可以了;在其他情况下,如果您只想说“我还在这里”,则会发送特定的“ping”/“pong”帧
- “何时”完全取决于协议; waiting for
readResult.IsCompleted
表示您正在等待入站套接字被标记为已关闭,这意味着在客户端关闭其出站套接字之前您不会发送任何内容;对于 single-shot 协议,这可能没问题;但在大多数情况下,您会希望查找单个入站帧,然后回复该帧(并重复) - 听起来你可能确实在写一个 one-shot 通道,即客户端只向服务器发送一件事,然后:服务器只向客户端发送一件事; 在那种情况下,你可以这样做:
while (true)
{
var readResult = await pipeReader.ReadAsync();
if (readResult.IsCompleted)
{
// TODO: not shown; process readResult.Buffer
// tell the pipe that we consumed everything, and exit
pipeReader.AdvanceTo(readResultBuffer.End, readResultBuffer.End);
break;
}
else
{
// wait for the client to close their outbound; tell
// the pipe that we couldn't consume anything
pipeReader.AdvanceTo(readResultBuffer.Start, readResultBuffer.End);
}
至于:
Should I store each
messageSegment
in aList<string>
and join it to a single string when
这里首先要考虑的是,每个缓冲区段包含的字符数不一定是准确的。由于您使用的是 UTF-8,这是一种 multi-byte 编码,一个段可能在开头和结尾包含小数字符,因此:对其进行解码要比这复杂一些。
正因为如此,通常会在缓冲区上检查IsSingleSegment
;如果这是真的,你可以使用简单的代码:
if (buffer.IsSingleSegment)
{
string message = Encoding.UTF8.GetString(s.FirstSpan);
DoSomethingWith(message);
}
else
{
// ... more complex
}
不连续缓冲区的情况要困难得多;基本上,您在这里有两个选择:
- 将段线性化为连续缓冲区,可能从
ArrayPool<byte>.Shared
租用超大缓冲区,并在租用缓冲区的 正确部分 上使用UTF8.GetString
- 在编码上使用
GetDecoder()
API,并使用它来填充新字符串,这在旧框架中意味着覆盖新分配的字符串,或者在新框架中意味着使用string.Create
API
坦率地说,“1”要简单得多。例如(未经测试):
public static string GetString(in this ReadOnlySequence<byte> payload,
Encoding encoding = null)
{
encoding ??= Encoding.UTF8;
return payload.IsSingleSegment ? encoding.GetString(payload.FirstSpan)
: GetStringSlow(payload, encoding);
static string GetStringSlow(in ReadOnlySequence<byte> payload, Encoding encoding)
{
// linearize
int length = checked((int)payload.Length);
var oversized = ArrayPool<byte>.Shared.Rent(length);
try
{
payload.CopyTo(oversized);
return encoding.GetString(oversized, 0, length);
}
finally
{
ArrayPool<byte>.Shared.Return(oversized);
}
}
}