一种多线程文件处理方法
An approach to multithreaded file processing
我有一个很大的文件(> 15 GB
)(不管是什么文件)。
我必须读取文件,对数据进行一些处理,然后将处理后的数据写入空白文件。
我分块做。每个块包含某种 header ,然后是数据。最简单的多块文件将包含:
Number of block bytes
Block bytes
Number of block bytes
Block bytes
因此,我创建了一个线程用于逐块读取文件,一些线程用于处理每个读取的块,还有一个线程用于逐块写入已处理的数据。
我在管理这些线程时遇到了一些问题。
我不知道每个块的处理顺序,但我必须按照读取的顺序将块写入文件。
所以,我的问题是我必须使用哪种方法来管理多线程处理。
我想,如果我使用 producer concumer 模式可能会更好。在这种情况下最好使用哪种数据结构来存储已经处理过的数据?我有一个猜测 - 一个基于数组的堆栈,我需要在开始写入之前排序一次。
但我不确定。所以,请帮我一个方法。
//sample of my code, but without any logic of threads managing
public class DataBlock
{
public byte[] Data { get; }
public long Index { get; }
public DataBlock(byte[] data, long index)
{
this.Data = data;
this.Index = index;
}
}
int bufferSize = 1024*64; //65536
long processedBlockCounter = 0L;
MyStack<DataBlock> processedBlockStore = new MyStack<DataBlock>();
using (FileStream fs = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize))
{
using (BufferedStream bs = new BufferedStream(fs, bufferSize))
{
byte[] buffer = new byte[bufferSize];
int byteRead;
while ((byteRead = bs.Read(buffer, 0, bufferSize)) > 0)
{
byte[] originalBytes;
using (MemoryStream mStream = new MemoryStream())
{
mStream.Write(buffer, 0, byteRead);
originalBytes = mStream.ToArray();
}
long dataBlockIndex = Interlocked.Increment(ref processedBlockCounter);
Thread processThread = new Thread(() =>
{
byte[] processedBytes = MyProcessor.Process(originalBytes);
DataBlock processedBlock = new DataBlock(processedBytes, processedBlockCounter);
lock(processedBlockStore)
{
processedBlockStore.Add(processedBlock);
}
});
processThread.Start();
}
}
}
您正在为每次迭代创建新线程。那不会扩大规模。我建议您改为使用 ThreadPool。首选方法是使用内部使用 ThreadPool 的 TPL。
由于您需要排序和并行处理,而且它们不是齐头并进的,如果可以的话,您可以使代码完全同步。
如果您需要并行处理,我推荐以下 Fork-Join 策略,因为您的文件大于 15 GB,而且您的处理也很耗时。
- 分块化文件
- 为每个块启动一个任务
- 让每个任务将输出写入一个名为 index.txt 的临时文件。
1.txt
、2.txt
等
- 等待所有任务完成
- 最后读取这些临时文件并按顺序创建输出文件。
- 然后当然是删除那些临时文件。你完成了。
我有一个很大的文件(> 15 GB
)(不管是什么文件)。
我必须读取文件,对数据进行一些处理,然后将处理后的数据写入空白文件。
我分块做。每个块包含某种 header ,然后是数据。最简单的多块文件将包含:
Number of block bytes
Block bytes
Number of block bytes
Block bytes
因此,我创建了一个线程用于逐块读取文件,一些线程用于处理每个读取的块,还有一个线程用于逐块写入已处理的数据。
我在管理这些线程时遇到了一些问题。
我不知道每个块的处理顺序,但我必须按照读取的顺序将块写入文件。
所以,我的问题是我必须使用哪种方法来管理多线程处理。
我想,如果我使用 producer concumer 模式可能会更好。在这种情况下最好使用哪种数据结构来存储已经处理过的数据?我有一个猜测 - 一个基于数组的堆栈,我需要在开始写入之前排序一次。
但我不确定。所以,请帮我一个方法。
//sample of my code, but without any logic of threads managing
public class DataBlock
{
public byte[] Data { get; }
public long Index { get; }
public DataBlock(byte[] data, long index)
{
this.Data = data;
this.Index = index;
}
}
int bufferSize = 1024*64; //65536
long processedBlockCounter = 0L;
MyStack<DataBlock> processedBlockStore = new MyStack<DataBlock>();
using (FileStream fs = new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize))
{
using (BufferedStream bs = new BufferedStream(fs, bufferSize))
{
byte[] buffer = new byte[bufferSize];
int byteRead;
while ((byteRead = bs.Read(buffer, 0, bufferSize)) > 0)
{
byte[] originalBytes;
using (MemoryStream mStream = new MemoryStream())
{
mStream.Write(buffer, 0, byteRead);
originalBytes = mStream.ToArray();
}
long dataBlockIndex = Interlocked.Increment(ref processedBlockCounter);
Thread processThread = new Thread(() =>
{
byte[] processedBytes = MyProcessor.Process(originalBytes);
DataBlock processedBlock = new DataBlock(processedBytes, processedBlockCounter);
lock(processedBlockStore)
{
processedBlockStore.Add(processedBlock);
}
});
processThread.Start();
}
}
}
您正在为每次迭代创建新线程。那不会扩大规模。我建议您改为使用 ThreadPool。首选方法是使用内部使用 ThreadPool 的 TPL。
由于您需要排序和并行处理,而且它们不是齐头并进的,如果可以的话,您可以使代码完全同步。
如果您需要并行处理,我推荐以下 Fork-Join 策略,因为您的文件大于 15 GB,而且您的处理也很耗时。
- 分块化文件
- 为每个块启动一个任务
- 让每个任务将输出写入一个名为 index.txt 的临时文件。
1.txt
、2.txt
等 - 等待所有任务完成
- 最后读取这些临时文件并按顺序创建输出文件。
- 然后当然是删除那些临时文件。你完成了。