如何使用 System.IO.Pipelines 包创建响应 TCP 侦听器?
How to create a responding TCP listener with the System.IO.Pipelines package?
我想使用 Kestrel 和 System.IO.Pipelines 包创建一个 TCP 侦听器。我收到的消息将永远是 HL7 messages。示例消息可以是
MSH|^~&|MegaReg|XYZHospC|SuperOE|XYZImgCtr|20060529090131-0500||ADT^A01^ADT_A01|01052901|P|2.5
EVN||200605290901||||200605290900
PID|||56782445^^^UAReg^PI||KLEINSAMPLE^BARRY^Q^JR||19620910|M||2028-9^^HL70005^RA99113^^XYZ|260
GOODWIN CREST DRIVE^^BIRMINGHAM^AL^35209^^M~NICKELL’S PICKLES^10000 W
100TH AVE^BIRMINGHAM^AL^35200^^O|||||||0105I30001^^^99DEF^AN
PV1||I|W^389^1^UABH^^^^3||||12345^MORGAN^REX^J^^^MD^0010^UAMC^L||67890^GRAINGER^LUCY^X^^^MD^0010^UAMC^L|MED|||||A0||13579^POTTER^SHERMAN^T^^^MD^0010^UAMC^L|||||||||||||||||||||||||||200605290900
OBX|1|NM|^Body Height||1.80|m^Meter^ISO+|||||F OBX|2|NM|^Body
Weight||79|kg^Kilogram^ISO+|||||F AL1|1||^ASPIRIN DG1|1||786.50^CHEST
PAIN, UNSPECIFIED^I9|||A
唯一需要注意的重要事项是每条传入的 HL7 消息都以垂直制表符开头,因此您知道消息从哪里开始。每个 HL7 消息都包含多个段,所以我想我必须遍历每个段。处理请求后,我想发回一条 HL7 消息作为响应。首先我想出了这个
internal class HL7Listener : ConnectionHandler
{
public override async Task OnConnectedAsync(ConnectionContext connection)
{
IDuplexPipe pipe = connection.Transport;
await FillPipe(pipe.Output);
await ReadPipe(pipe.Input);
}
private async Task FillPipe(PipeWriter pipeWriter)
{
const int minimumBufferSize = 512;
while (true)
{
Memory<byte> memory = pipeWriter.GetMemory(minimumBufferSize);
try
{
int bytesRead = 32; // not sure what to do here
if (bytesRead == 0)
{
break;
}
pipeWriter.Advance(bytesRead);
}
catch (Exception ex)
{
// ... something failed ...
break;
}
FlushResult result = await pipeWriter.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
pipeWriter.Complete();
}
private async Task ReadPipe(PipeReader pipeReader)
{
while (true)
{
ReadResult result = await pipeReader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition? position;
do
{
position = buffer.PositionOf((byte)'\v');
if (position != null)
{
ReadOnlySequence<byte> line = buffer.Slice(0, position.Value);
// ... Process the line ...
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
}
}
while (position != null);
pipeReader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
{
break;
}
}
pipeReader.Complete();
}
}
不幸的是,我正在为一些事情而苦苦挣扎:
int bytesRead = 32;
部分,如何知道读取了多少字节?或者如何用writer实例读取?
- 目前调试器没有命中
// ... Process the line ...
处的代码。基本上我必须提取整个 HL7 消息,以便我可以使用我的 HL7 解析器来转换消息字符串。
- 我必须在哪里回复?调用
await ReadPipe(pipe.Input);
后?通过使用 await connection.Transport.Output.WriteAsync(/* the HL7 message to send back */);
?
您看过 David Fowler 的 TcpEcho 例子吗?我会说这是相当规范的,因为他是发布开发博客 System.IO.Pipelines 公告的人。
他的示例处理原始套接字。我已经将它改编为 ConnectionHandler API 和 HL7 消息(但是,我对 HL7 知之甚少):
internal class HL7Listener : ConnectionHandler
{
public override async Task OnConnectedAsync(ConnectionContext connection)
{
while (true)
{
var result = await connection.Transport.Input.ReadAsync();
var buffer = result.Buffer;
while (TryReadMessage(ref buffer, out ReadOnlySequence<byte> hl7Message))
{
// Process the line.
var response = ProcessMessage(hl7Message);
await connection.Transport.Output.WriteAsync(response);
}
if (result.IsCompleted)
{
break;
}
connection.Transport.Input.AdvanceTo(buffer.Start, buffer.End);
}
}
public static bool TryReadMessage(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> hl7Message)
{
var endOfMessage = buffer.PositionOf((byte)0x1C);
if (endOfMessage == null || !TryMatchNextByte(ref buffer, endOfMessage.Value, 0x0D, out var lastBytePosition))
{
hl7Message = default;
return false;
}
var messageBounds = buffer.GetPosition(1, lastBytePosition.Value); // Slice() is exclusive on the upper bound
hl7Message = buffer.Slice(0, messageBounds);
buffer = buffer.Slice(messageBounds); // remove message from buffer
return true;
}
/// <summary>
/// Does the next byte after currentPosition match the provided value?
/// </summary>
private static bool TryMatchNextByte(ref ReadOnlySequence<byte> buffer, SequencePosition currentPosition, byte value, out SequencePosition? nextPosition)
{
nextPosition = buffer.Slice(currentPosition).PositionOf(value);
if(nextPosition == null || !nextPosition.Value.Equals(buffer.GetPosition(1, currentPosition)))
{
nextPosition = null;
return false;
}
return true;
}
private ReadOnlyMemory<byte> ProcessMessage(ReadOnlySequence<byte> hl7Message)
{
var incomingMessage = Encoding.UTF8.GetString(hl7Message.ToArray());
// do something with the message and generate your response. I'm using UTF8 here
// but not sure if that's valid for HL7.
return Encoding.UTF8.GetBytes("Response message: OK!");
}
}
更新: 添加了关于 HL7 消息结构的最新信息。
我想使用 Kestrel 和 System.IO.Pipelines 包创建一个 TCP 侦听器。我收到的消息将永远是 HL7 messages。示例消息可以是
MSH|^~&|MegaReg|XYZHospC|SuperOE|XYZImgCtr|20060529090131-0500||ADT^A01^ADT_A01|01052901|P|2.5 EVN||200605290901||||200605290900 PID|||56782445^^^UAReg^PI||KLEINSAMPLE^BARRY^Q^JR||19620910|M||2028-9^^HL70005^RA99113^^XYZ|260 GOODWIN CREST DRIVE^^BIRMINGHAM^AL^35209^^M~NICKELL’S PICKLES^10000 W 100TH AVE^BIRMINGHAM^AL^35200^^O|||||||0105I30001^^^99DEF^AN PV1||I|W^389^1^UABH^^^^3||||12345^MORGAN^REX^J^^^MD^0010^UAMC^L||67890^GRAINGER^LUCY^X^^^MD^0010^UAMC^L|MED|||||A0||13579^POTTER^SHERMAN^T^^^MD^0010^UAMC^L|||||||||||||||||||||||||||200605290900 OBX|1|NM|^Body Height||1.80|m^Meter^ISO+|||||F OBX|2|NM|^Body Weight||79|kg^Kilogram^ISO+|||||F AL1|1||^ASPIRIN DG1|1||786.50^CHEST PAIN, UNSPECIFIED^I9|||A
唯一需要注意的重要事项是每条传入的 HL7 消息都以垂直制表符开头,因此您知道消息从哪里开始。每个 HL7 消息都包含多个段,所以我想我必须遍历每个段。处理请求后,我想发回一条 HL7 消息作为响应。首先我想出了这个
internal class HL7Listener : ConnectionHandler
{
public override async Task OnConnectedAsync(ConnectionContext connection)
{
IDuplexPipe pipe = connection.Transport;
await FillPipe(pipe.Output);
await ReadPipe(pipe.Input);
}
private async Task FillPipe(PipeWriter pipeWriter)
{
const int minimumBufferSize = 512;
while (true)
{
Memory<byte> memory = pipeWriter.GetMemory(minimumBufferSize);
try
{
int bytesRead = 32; // not sure what to do here
if (bytesRead == 0)
{
break;
}
pipeWriter.Advance(bytesRead);
}
catch (Exception ex)
{
// ... something failed ...
break;
}
FlushResult result = await pipeWriter.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
pipeWriter.Complete();
}
private async Task ReadPipe(PipeReader pipeReader)
{
while (true)
{
ReadResult result = await pipeReader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition? position;
do
{
position = buffer.PositionOf((byte)'\v');
if (position != null)
{
ReadOnlySequence<byte> line = buffer.Slice(0, position.Value);
// ... Process the line ...
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
}
}
while (position != null);
pipeReader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
{
break;
}
}
pipeReader.Complete();
}
}
不幸的是,我正在为一些事情而苦苦挣扎:
int bytesRead = 32;
部分,如何知道读取了多少字节?或者如何用writer实例读取?- 目前调试器没有命中
// ... Process the line ...
处的代码。基本上我必须提取整个 HL7 消息,以便我可以使用我的 HL7 解析器来转换消息字符串。 - 我必须在哪里回复?调用
await ReadPipe(pipe.Input);
后?通过使用await connection.Transport.Output.WriteAsync(/* the HL7 message to send back */);
?
您看过 David Fowler 的 TcpEcho 例子吗?我会说这是相当规范的,因为他是发布开发博客 System.IO.Pipelines 公告的人。
他的示例处理原始套接字。我已经将它改编为 ConnectionHandler API 和 HL7 消息(但是,我对 HL7 知之甚少):
internal class HL7Listener : ConnectionHandler
{
public override async Task OnConnectedAsync(ConnectionContext connection)
{
while (true)
{
var result = await connection.Transport.Input.ReadAsync();
var buffer = result.Buffer;
while (TryReadMessage(ref buffer, out ReadOnlySequence<byte> hl7Message))
{
// Process the line.
var response = ProcessMessage(hl7Message);
await connection.Transport.Output.WriteAsync(response);
}
if (result.IsCompleted)
{
break;
}
connection.Transport.Input.AdvanceTo(buffer.Start, buffer.End);
}
}
public static bool TryReadMessage(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> hl7Message)
{
var endOfMessage = buffer.PositionOf((byte)0x1C);
if (endOfMessage == null || !TryMatchNextByte(ref buffer, endOfMessage.Value, 0x0D, out var lastBytePosition))
{
hl7Message = default;
return false;
}
var messageBounds = buffer.GetPosition(1, lastBytePosition.Value); // Slice() is exclusive on the upper bound
hl7Message = buffer.Slice(0, messageBounds);
buffer = buffer.Slice(messageBounds); // remove message from buffer
return true;
}
/// <summary>
/// Does the next byte after currentPosition match the provided value?
/// </summary>
private static bool TryMatchNextByte(ref ReadOnlySequence<byte> buffer, SequencePosition currentPosition, byte value, out SequencePosition? nextPosition)
{
nextPosition = buffer.Slice(currentPosition).PositionOf(value);
if(nextPosition == null || !nextPosition.Value.Equals(buffer.GetPosition(1, currentPosition)))
{
nextPosition = null;
return false;
}
return true;
}
private ReadOnlyMemory<byte> ProcessMessage(ReadOnlySequence<byte> hl7Message)
{
var incomingMessage = Encoding.UTF8.GetString(hl7Message.ToArray());
// do something with the message and generate your response. I'm using UTF8 here
// but not sure if that's valid for HL7.
return Encoding.UTF8.GetBytes("Response message: OK!");
}
}
更新: 添加了关于 HL7 消息结构的最新信息。