有没有办法使用并行处理从文件中读取块并按顺序将字符串连接在一起?
Is there a way to use Parallel processing to read chunks from a file and join together the string in order?
我看到很多关于如何使用并行添加数字的示例,但是我没有发现任何可以证明从流中并行读取多个块(比如每个块 512 字节)并将结果连接在一起的东西。
我想知道是否可以读取流的多个部分并将它们按正确的顺序连接在一起。
例如
假设以下文本文件
Bird
Cats
Dogs
从普通流中读取 5 个字节的块大小类似于:
byte[] buffer = new byte[5];
int bytesRead = 0;
StringBuilder sb = new StringBuilder();
using (Stream stream = new FileStream( "animals.txt", FileMode.Open, FileAccess.Read )) {
while ( (bytesRead = stream.Read( buffer, 0, buffer.Length )) > 0 ) {
sb.Append( Encoding.UTF8.GetString( buffer ) );
}
}
将读取每一行(所有行均为 5 个字节)并按顺序将它们连接在一起,以便生成的字符串与文件相同。
但是,考虑使用 this solution 之类的东西似乎可能会乱序加入它们。我也不知道如何在上述上下文中应用它来替换 where
循环。
我如何同时读取这些块并将它们附加到 StringBuilder
每次迭代的字节 - 不是迭代发生的顺序,而是正确的顺序所以我不会结束像
Cats
Bird
Dog
抱歉,我没有任何并行代码可以显示,因为这是 post 的原因。如果您想对数字求和,这似乎很容易,但要让它以如下方式工作:
- 以字节块的形式从流中读取(比如每个块 512 字节)
- 按照它们在流中的顺序附加到主结果,不一定是处理的顺序。
...似乎是一个艰巨的挑战
就其本质而言,流与并行处理不兼容。流的抽象是顺序访问。
可以将流内容依次读入数组,然后对其进行并行处理,达到预期效果(并行处理)。您甚至可以在流块到达时生成并行任务。
var tasks = new List<Task>();
do {
var buffer = new byte[blockSize];
var location = stream.Position;
stream.Read(buffer);
tasks.Add(ProcessAsync(buffer, location));
} while (!end of stream);
await Task.WhenAll(tasks.ToArray());
或者,如果您有随机访问权限,则可以生成并行任务,每个任务都包含从输入的特定部分读取、处理它并存储到结果的相应部分的指令。但是请注意,虽然可以随机访问文件,但访问仍然必须通过单个磁盘控制器......并且硬盘不是随机访问,即使它们公开了随机访问接口,也会导致非顺序读取模式大量时间浪费在寻找上,降低效率远低于从流式传输中获得的效率。 (SSD 不寻求,因此随机请求模式没有太多惩罚,但您也不会受益)
感谢@Kraang 在以下示例中的协作,匹配并行处理二进制数据的情况。
如果单独从字节读取,您可以使用并行处理来处理块,如下所示:
// the byte array goes here
byte[] data = new byte[N];
// the block size
int blockSize = 5;
// find how many chunks there are
int blockCount = 1 + (data.Length - 1) / blockSize;
byte[][] processedChunks = new byte[blockCount][];
Parallel.For( 0, blockCount, ( i ) => {
var offset = i * blockSize;
// set the buffer size to block size or remaining bytes whichever is smaller
var buffer = new byte[Math.Min( blockSize, data.Length - offset )];
// copy the bytes from data to the buffer
Buffer.BlockCopy( data, i * blockSize, buffer, 0, buffer.Length );
// store buffer results into array in position `i` preserving order
processedChunks[i] = Process(buffer);
} );
// recombine chunks using e.g. LINQ SelectMany
我看到很多关于如何使用并行添加数字的示例,但是我没有发现任何可以证明从流中并行读取多个块(比如每个块 512 字节)并将结果连接在一起的东西。
我想知道是否可以读取流的多个部分并将它们按正确的顺序连接在一起。
例如
假设以下文本文件
Bird
Cats
Dogs
从普通流中读取 5 个字节的块大小类似于:
byte[] buffer = new byte[5];
int bytesRead = 0;
StringBuilder sb = new StringBuilder();
using (Stream stream = new FileStream( "animals.txt", FileMode.Open, FileAccess.Read )) {
while ( (bytesRead = stream.Read( buffer, 0, buffer.Length )) > 0 ) {
sb.Append( Encoding.UTF8.GetString( buffer ) );
}
}
将读取每一行(所有行均为 5 个字节)并按顺序将它们连接在一起,以便生成的字符串与文件相同。
但是,考虑使用 this solution 之类的东西似乎可能会乱序加入它们。我也不知道如何在上述上下文中应用它来替换 where
循环。
我如何同时读取这些块并将它们附加到 StringBuilder
每次迭代的字节 - 不是迭代发生的顺序,而是正确的顺序所以我不会结束像
Cats
Bird
Dog
抱歉,我没有任何并行代码可以显示,因为这是 post 的原因。如果您想对数字求和,这似乎很容易,但要让它以如下方式工作:
- 以字节块的形式从流中读取(比如每个块 512 字节)
- 按照它们在流中的顺序附加到主结果,不一定是处理的顺序。
...似乎是一个艰巨的挑战
就其本质而言,流与并行处理不兼容。流的抽象是顺序访问。
可以将流内容依次读入数组,然后对其进行并行处理,达到预期效果(并行处理)。您甚至可以在流块到达时生成并行任务。
var tasks = new List<Task>();
do {
var buffer = new byte[blockSize];
var location = stream.Position;
stream.Read(buffer);
tasks.Add(ProcessAsync(buffer, location));
} while (!end of stream);
await Task.WhenAll(tasks.ToArray());
或者,如果您有随机访问权限,则可以生成并行任务,每个任务都包含从输入的特定部分读取、处理它并存储到结果的相应部分的指令。但是请注意,虽然可以随机访问文件,但访问仍然必须通过单个磁盘控制器......并且硬盘不是随机访问,即使它们公开了随机访问接口,也会导致非顺序读取模式大量时间浪费在寻找上,降低效率远低于从流式传输中获得的效率。 (SSD 不寻求,因此随机请求模式没有太多惩罚,但您也不会受益)
感谢@Kraang 在以下示例中的协作,匹配并行处理二进制数据的情况。
如果单独从字节读取,您可以使用并行处理来处理块,如下所示:
// the byte array goes here
byte[] data = new byte[N];
// the block size
int blockSize = 5;
// find how many chunks there are
int blockCount = 1 + (data.Length - 1) / blockSize;
byte[][] processedChunks = new byte[blockCount][];
Parallel.For( 0, blockCount, ( i ) => {
var offset = i * blockSize;
// set the buffer size to block size or remaining bytes whichever is smaller
var buffer = new byte[Math.Min( blockSize, data.Length - offset )];
// copy the bytes from data to the buffer
Buffer.BlockCopy( data, i * blockSize, buffer, 0, buffer.Length );
// store buffer results into array in position `i` preserving order
processedChunks[i] = Process(buffer);
} );
// recombine chunks using e.g. LINQ SelectMany