为什么这个 System.IO.Pipelines 代码比基于流的代码慢得多?
Why is this System.IO.Pipelines code much slower than Stream-based code?
我编写了一个小解析程序来比较 .NET Core 中较旧的 System.IO.Stream
和较新的 System.IO.Pipelines
。我期望管道代码具有同等速度或更快。但是,它慢了大约 40%。
程序很简单:在一个100Mb的文本文件中搜索一个关键字,returns关键字的行号。这是 Stream 版本:
public static async Task<int> GetLineNumberUsingStreamAsync(
string file,
string searchWord)
{
using var fileStream = File.OpenRead(file);
using var lines = new StreamReader(fileStream, bufferSize: 4096);
int lineNumber = 1;
// ReadLineAsync returns null on stream end, exiting the loop
while (await lines.ReadLineAsync() is string line)
{
if (line.Contains(searchWord))
return lineNumber;
lineNumber++;
}
return -1;
}
我希望上面的流代码比下面的管道代码慢,因为流代码将字节编码为 StreamReader 中的字符串。管道代码通过对字节进行操作来避免这种情况:
public static async Task<int> GetLineNumberUsingPipeAsync(string file, string searchWord)
{
var searchBytes = Encoding.UTF8.GetBytes(searchWord);
using var fileStream = File.OpenRead(file);
var pipe = PipeReader.Create(fileStream, new StreamPipeReaderOptions(bufferSize: 4096));
var lineNumber = 1;
while (true)
{
var readResult = await pipe.ReadAsync().ConfigureAwait(false);
var buffer = readResult.Buffer;
if(TryFindBytesInBuffer(ref buffer, searchBytes, ref lineNumber))
{
return lineNumber;
}
pipe.AdvanceTo(buffer.End);
if (readResult.IsCompleted) break;
}
await pipe.CompleteAsync();
return -1;
}
以下是相关的辅助方法:
/// <summary>
/// Look for `searchBytes` in `buffer`, incrementing the `lineNumber` every
/// time we find a new line.
/// </summary>
/// <returns>true if we found the searchBytes, false otherwise</returns>
static bool TryFindBytesInBuffer(
ref ReadOnlySequence<byte> buffer,
in ReadOnlySpan<byte> searchBytes,
ref int lineNumber)
{
var bufferReader = new SequenceReader<byte>(buffer);
while (TryReadLine(ref bufferReader, out var line))
{
if (ContainsBytes(ref line, searchBytes))
return true;
lineNumber++;
}
return false;
}
static bool TryReadLine(
ref SequenceReader<byte> bufferReader,
out ReadOnlySequence<byte> line)
{
var foundNewLine = bufferReader.TryReadTo(out line, (byte)'\n', advancePastDelimiter: true);
if (!foundNewLine)
{
line = default;
return false;
}
return true;
}
static bool ContainsBytes(
ref ReadOnlySequence<byte> line,
in ReadOnlySpan<byte> searchBytes)
{
return new SequenceReader<byte>(line).TryReadTo(out var _, searchBytes);
}
我在上面使用 SequenceReader<byte>
,因为我的理解是 intelligent/faster 多于 ReadOnlySequence<byte>
;当它可以在单个 Span<byte>
.
上运行时,它有一条快速路径
这是基准测试结果 (.NET Core 3.1)。完整代码和 BenchmarkDotNet 结果可用 in this repo。
- GetLineNumberWithStreamAsync - 435.6 毫秒 同时分配 366.19 MB
- GetLineNumberUsingPipeAsync - 619.8 ms 同时分配 9.28 MB
我在管道代码中做错了什么吗?
更新: Evk 已回答问题。应用他的修复后,这里是新的基准数字:
- GetLineNumberWithStreamAsync - 452.2 毫秒 同时分配 366.19 MB
- GetLineNumberWithPipeAsync - 203.8 毫秒,同时分配了 9.28 MB
这可能不是您要找的解释,但我希望它能提供一些见解:
浏览一下现有的两种方法,它表明第二种解决方案在计算上比另一种更复杂,因为它有两个嵌套循环。
使用代码分析进行更深入的挖掘表明,第二个 (GetLineNumberUsingPipeAsync) 比使用 Stream 的 CPU 密集度高出近 21.5%(请检查屏幕截图,)并且它足够接近基准我得到的结果:
解决方案#1:683.7 毫秒,365.84 MB
解决方案#2:777.5 毫秒,9.08 MB
我认为原因是 SequenceReader.TryReadTo
的实现。 Here is the source code 这个方法。它使用非常简单的算法(读取第一个字节的匹配项,然后检查该匹配项之后的所有后续字节是否匹配,如果不匹配 - 向前推进 1 个字节并重复),并注意此实现中有很多方法称为“慢” (IsNextSlow
、TryReadToSlow
等等),所以至少在某些情况下,在某些情况下,它会退回到一些慢速路径。它还必须处理事实序列可能包含多个段,并保持位置。
在您的情况下,您可以避免使用 SequenceReader
专门用于搜索匹配项(但将其保留用于实际阅读行),例如进行此微小更改(TryReadTo
的重载也更多在这种情况下有效):
private static bool TryReadLine(ref SequenceReader<byte> bufferReader, out ReadOnlySpan<byte> line) {
// note that both `match` and `line` are now `ReadOnlySpan` and not `ReadOnlySequence`
var foundNewLine = bufferReader.TryReadTo(out ReadOnlySpan<byte> match, (byte) '\n', advancePastDelimiter: true);
if (!foundNewLine) {
line = default;
return false;
}
line = match;
return true;
}
然后:
private static bool ContainsBytes(ref ReadOnlySpan<byte> line, in ReadOnlySpan<byte> searchBytes) {
// line is now `ReadOnlySpan` so we can use efficient `IndexOf` method
return line.IndexOf(searchBytes) >= 0;
}
这将使您的管道代码 运行 比流代码更快。
我编写了一个小解析程序来比较 .NET Core 中较旧的 System.IO.Stream
和较新的 System.IO.Pipelines
。我期望管道代码具有同等速度或更快。但是,它慢了大约 40%。
程序很简单:在一个100Mb的文本文件中搜索一个关键字,returns关键字的行号。这是 Stream 版本:
public static async Task<int> GetLineNumberUsingStreamAsync(
string file,
string searchWord)
{
using var fileStream = File.OpenRead(file);
using var lines = new StreamReader(fileStream, bufferSize: 4096);
int lineNumber = 1;
// ReadLineAsync returns null on stream end, exiting the loop
while (await lines.ReadLineAsync() is string line)
{
if (line.Contains(searchWord))
return lineNumber;
lineNumber++;
}
return -1;
}
我希望上面的流代码比下面的管道代码慢,因为流代码将字节编码为 StreamReader 中的字符串。管道代码通过对字节进行操作来避免这种情况:
public static async Task<int> GetLineNumberUsingPipeAsync(string file, string searchWord)
{
var searchBytes = Encoding.UTF8.GetBytes(searchWord);
using var fileStream = File.OpenRead(file);
var pipe = PipeReader.Create(fileStream, new StreamPipeReaderOptions(bufferSize: 4096));
var lineNumber = 1;
while (true)
{
var readResult = await pipe.ReadAsync().ConfigureAwait(false);
var buffer = readResult.Buffer;
if(TryFindBytesInBuffer(ref buffer, searchBytes, ref lineNumber))
{
return lineNumber;
}
pipe.AdvanceTo(buffer.End);
if (readResult.IsCompleted) break;
}
await pipe.CompleteAsync();
return -1;
}
以下是相关的辅助方法:
/// <summary>
/// Look for `searchBytes` in `buffer`, incrementing the `lineNumber` every
/// time we find a new line.
/// </summary>
/// <returns>true if we found the searchBytes, false otherwise</returns>
static bool TryFindBytesInBuffer(
ref ReadOnlySequence<byte> buffer,
in ReadOnlySpan<byte> searchBytes,
ref int lineNumber)
{
var bufferReader = new SequenceReader<byte>(buffer);
while (TryReadLine(ref bufferReader, out var line))
{
if (ContainsBytes(ref line, searchBytes))
return true;
lineNumber++;
}
return false;
}
static bool TryReadLine(
ref SequenceReader<byte> bufferReader,
out ReadOnlySequence<byte> line)
{
var foundNewLine = bufferReader.TryReadTo(out line, (byte)'\n', advancePastDelimiter: true);
if (!foundNewLine)
{
line = default;
return false;
}
return true;
}
static bool ContainsBytes(
ref ReadOnlySequence<byte> line,
in ReadOnlySpan<byte> searchBytes)
{
return new SequenceReader<byte>(line).TryReadTo(out var _, searchBytes);
}
我在上面使用 SequenceReader<byte>
,因为我的理解是 intelligent/faster 多于 ReadOnlySequence<byte>
;当它可以在单个 Span<byte>
.
这是基准测试结果 (.NET Core 3.1)。完整代码和 BenchmarkDotNet 结果可用 in this repo。
- GetLineNumberWithStreamAsync - 435.6 毫秒 同时分配 366.19 MB
- GetLineNumberUsingPipeAsync - 619.8 ms 同时分配 9.28 MB
我在管道代码中做错了什么吗?
更新: Evk 已回答问题。应用他的修复后,这里是新的基准数字:
- GetLineNumberWithStreamAsync - 452.2 毫秒 同时分配 366.19 MB
- GetLineNumberWithPipeAsync - 203.8 毫秒,同时分配了 9.28 MB
这可能不是您要找的解释,但我希望它能提供一些见解:
浏览一下现有的两种方法,它表明第二种解决方案在计算上比另一种更复杂,因为它有两个嵌套循环。
使用代码分析进行更深入的挖掘表明,第二个 (GetLineNumberUsingPipeAsync) 比使用 Stream 的 CPU 密集度高出近 21.5%(请检查屏幕截图,)并且它足够接近基准我得到的结果:
解决方案#1:683.7 毫秒,365.84 MB
解决方案#2:777.5 毫秒,9.08 MB
我认为原因是 SequenceReader.TryReadTo
的实现。 Here is the source code 这个方法。它使用非常简单的算法(读取第一个字节的匹配项,然后检查该匹配项之后的所有后续字节是否匹配,如果不匹配 - 向前推进 1 个字节并重复),并注意此实现中有很多方法称为“慢” (IsNextSlow
、TryReadToSlow
等等),所以至少在某些情况下,在某些情况下,它会退回到一些慢速路径。它还必须处理事实序列可能包含多个段,并保持位置。
在您的情况下,您可以避免使用 SequenceReader
专门用于搜索匹配项(但将其保留用于实际阅读行),例如进行此微小更改(TryReadTo
的重载也更多在这种情况下有效):
private static bool TryReadLine(ref SequenceReader<byte> bufferReader, out ReadOnlySpan<byte> line) {
// note that both `match` and `line` are now `ReadOnlySpan` and not `ReadOnlySequence`
var foundNewLine = bufferReader.TryReadTo(out ReadOnlySpan<byte> match, (byte) '\n', advancePastDelimiter: true);
if (!foundNewLine) {
line = default;
return false;
}
line = match;
return true;
}
然后:
private static bool ContainsBytes(ref ReadOnlySpan<byte> line, in ReadOnlySpan<byte> searchBytes) {
// line is now `ReadOnlySpan` so we can use efficient `IndexOf` method
return line.IndexOf(searchBytes) >= 0;
}
这将使您的管道代码 运行 比流代码更快。