管道缓冲区保留直到处理完成
Pipelines buffer preserving until processing is complete
我正在研究使用管道处理来自网络的二进制消息的可能性。
我将处理的二进制消息带有有效负载,最好将有效负载保持为二进制形式。
想法是读出整个消息并创建一个消息片段及其有效负载,一旦消息被完全读取,它将被传递到通道链进行处理,处理不会是即时的,可能需要一段时间或稍后执行,目标是 不让管道 reader 等待 直到处理完成,然后一旦消息处理完成,我将需要释放处理缓冲区到管道编写器。
现在我当然可以创建一个新的字节数组并复制来自管道写入器的数据,但这会破坏不复制的目的吗?所以据我所知,我需要在管道和通道之间进行一些缓冲区同步吗?
我观察了管道 reader 的可用 API (AdvanceTo),它可以告诉管道 reader 消耗了什么和检查了什么,但无法解决这个问题可以在管道读取方法之外同步。
所以问题是是否有一些技术或示例可以实现这一目标。
从 TryRead
/ReadAsync
获得的缓冲区仅在您调用 AdvanceTo
之前有效,并且 期望 一旦您已经做到了:您报告为消耗的任何东西都可以回收用于其他地方(可能是 parallel/concurrent 读者)。严格来说:即使您 还没有 报告为消耗的位:一旦您调用 AdvanceTo
,您仍然不应该将其视为有效(尽管实际上,它很可能它们仍然是相同的段 - 只是:这不是调用者关心的问题;对于调用者来说,它仅在读取和推进之间有效。
这意味着您明确不能这样做:
while (...)
{
var result = await pipe.ReadAsync();
if (TryIdentifyFrameBoundary(out var frame)) {
BeginProcessingInBackground(frame); // <==== THIS IS A PROBLEM!
reader.AdvanceTo(frame.End, frame.End);
}
else if { // take nothing
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted) break; // that's all folks
}
}
因为“后台”位在触发时可能正在读取其他人的数据(因为它已经被重用)。
因此:或者您需要将帧内容作为读取循环的一部分进行处理,或您将不得不复制数据,最有可能使用:
c#
var len = checked ((int)buffer.Length);
var oversized = ArrayPool<byte>.Shared.Rent(len);
buffer.CopyTo(oversized);
并将 oversized
传递给您的后台处理,记住只查看它的前 len
个字节。您 可以 将其作为 ReadOnlyMemory<byte>
传递,但您需要考虑到之后您还想 return 将其传递给 array-pool (可能在 finally
块中),并将其作为内存传递会使它变得有点尴尬(但并非不可能,感谢 MemoryMarshal.TryGetArray
)。
注意:在早期版本的管道 API 中,有一个 reference-counting 的元素,确实 允许您保留缓冲区,但是它有一些问题:
- 它使 API 变得非常复杂
- 它导致缓冲区泄漏
- “保留”的含义含糊不清;直到它被 重新使用 之前的计数是多少?或者完全释放?
因此该功能已被删除。
我正在研究使用管道处理来自网络的二进制消息的可能性。 我将处理的二进制消息带有有效负载,最好将有效负载保持为二进制形式。
想法是读出整个消息并创建一个消息片段及其有效负载,一旦消息被完全读取,它将被传递到通道链进行处理,处理不会是即时的,可能需要一段时间或稍后执行,目标是 不让管道 reader 等待 直到处理完成,然后一旦消息处理完成,我将需要释放处理缓冲区到管道编写器。
现在我当然可以创建一个新的字节数组并复制来自管道写入器的数据,但这会破坏不复制的目的吗?所以据我所知,我需要在管道和通道之间进行一些缓冲区同步吗? 我观察了管道 reader 的可用 API (AdvanceTo),它可以告诉管道 reader 消耗了什么和检查了什么,但无法解决这个问题可以在管道读取方法之外同步。
所以问题是是否有一些技术或示例可以实现这一目标。
从 TryRead
/ReadAsync
获得的缓冲区仅在您调用 AdvanceTo
之前有效,并且 期望 一旦您已经做到了:您报告为消耗的任何东西都可以回收用于其他地方(可能是 parallel/concurrent 读者)。严格来说:即使您 还没有 报告为消耗的位:一旦您调用 AdvanceTo
,您仍然不应该将其视为有效(尽管实际上,它很可能它们仍然是相同的段 - 只是:这不是调用者关心的问题;对于调用者来说,它仅在读取和推进之间有效。
这意味着您明确不能这样做:
while (...)
{
var result = await pipe.ReadAsync();
if (TryIdentifyFrameBoundary(out var frame)) {
BeginProcessingInBackground(frame); // <==== THIS IS A PROBLEM!
reader.AdvanceTo(frame.End, frame.End);
}
else if { // take nothing
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted) break; // that's all folks
}
}
因为“后台”位在触发时可能正在读取其他人的数据(因为它已经被重用)。
因此:或者您需要将帧内容作为读取循环的一部分进行处理,或您将不得不复制数据,最有可能使用:
c#
var len = checked ((int)buffer.Length);
var oversized = ArrayPool<byte>.Shared.Rent(len);
buffer.CopyTo(oversized);
并将 oversized
传递给您的后台处理,记住只查看它的前 len
个字节。您 可以 将其作为 ReadOnlyMemory<byte>
传递,但您需要考虑到之后您还想 return 将其传递给 array-pool (可能在 finally
块中),并将其作为内存传递会使它变得有点尴尬(但并非不可能,感谢 MemoryMarshal.TryGetArray
)。
注意:在早期版本的管道 API 中,有一个 reference-counting 的元素,确实 允许您保留缓冲区,但是它有一些问题:
- 它使 API 变得非常复杂
- 它导致缓冲区泄漏
- “保留”的含义含糊不清;直到它被 重新使用 之前的计数是多少?或者完全释放?
因此该功能已被删除。