多线程文件压缩

Multithreading file compress

我刚刚开始使用线程, 我想写一个简单的文件压缩器。它应该创建两个后台线程——一个用于读取,另一个用于写入。第一个应该按小块读取文件并将它们放入队列中,其中 int - 是 chunkId。第二个线程应该使块出列并按顺序(使用 chunkId)将它们写入输出流(文件,该线程在开始时创建)。

我做到了。但我不明白为什么在我的程序结束并打开我的 gzip 文件后 - 我看到,我的块混合在一起,并且文件没有以前的顺序。

public static class Reader
{
    private static readonly object Locker = new object();

    private const int ChunkSize = 1024*1024;

    private static readonly int MaxThreads;
    private static readonly Queue<KeyValuePair<int, byte[]>> ChunksQueue;
    private static int _chunksComplete;

    static Reader()
    {
        MaxThreads = Environment.ProcessorCount;
        ChunksQueue = new Queue<KeyValuePair<int,byte[]>>(MaxThreads);
    }

    public static void Read(string filename)
    {
        _chunksComplete = 0;

        var tRead = new Thread(Reading) { IsBackground = true };
        var tWrite = new Thread(Writing) { IsBackground = true };

        tRead.Start(filename);
        tWrite.Start(filename);

        tRead.Join();
        tWrite.Join();

        Console.WriteLine("Finished");
    }

    private static void Writing(object threadContext)
    {
        var filename = (string) threadContext;

        using (var s = File.Create(filename + ".gz"))
        {
            while (true)
            {
                var dataPair = DequeueSafe();
                if (dataPair.Value == null)
                    return;

                while (dataPair.Key != _chunksComplete)
                {
                    Thread.Sleep(1);
                }

                Console.WriteLine("write chunk {0}", dataPair.Key);

                using (var gz = new GZipStream(s, CompressionMode.Compress, true))
                {
                    gz.Write(dataPair.Value, 0, dataPair.Value.Length);
                }

                _chunksComplete++;
            }
        }
    }

    private static void Reading(object threadContext)
    {
        var filename = (string) threadContext;

        using (var s = File.OpenRead(filename))
        {
            var counter = 0;
            var buffer = new byte[ChunkSize];
            while (s.Read(buffer, 0, buffer.Length) != 0)
            {
                while (ChunksQueue.Count == MaxThreads)
                {
                    Thread.Sleep(1);
                }

                Console.WriteLine("read chunk {0}", counter);

                var dataPair = new KeyValuePair<int, byte[]>(counter, buffer);

                EnqueueSafe(dataPair);

                counter++;
            }

            EnqueueSafe(new KeyValuePair<int, byte[]>(0, null));
        }
    }

    private static void EnqueueSafe(KeyValuePair<int, byte[]> dataPair)
    {
        lock (ChunksQueue)
        {
            ChunksQueue.Enqueue(dataPair);
        }
    }

    private static KeyValuePair<int, byte[]> DequeueSafe()
    {
        while (true)
        {
            lock (ChunksQueue)
            {
                if (ChunksQueue.Count > 0)
                {
                    return ChunksQueue.Dequeue();
                }
            }

            Thread.Sleep(1);
        }
    } 
}

更新: 我只能使用 .NET 3.5

Stream.Read() returns 它消耗的实际字节数。使用它来限制编写器的块大小。而且,由于涉及并发读取和写入,您将需要多个缓冲区。 尝试 4096 作为块大小。

Reader:

var buffer = new byte[ChunkSize]; 
int bytesRead = s.Read(buffer, 0, buffer.Length);

while (bytesRead != 0)
{  
   ...
   var dataPair = new KeyValuePair<int, byte[]>(bytesRead, buffer); 
   buffer = new byte[ChunkSize];
   bytesRead = s.Read(buffer, 0, buffer.Length);
}

作者:

 gz.Write(dataPair.Value, 0, dataPair.Key)

PS:可以通过添加空闲数据缓冲区池而不是每次都分配新缓冲区并使用事件(例如ManualResetEvent)来通知队列来提高性能empty, queue is full 而不是使用 Thread.Sleep().

虽然 确实提出了一个非常重要的观点,即 Stream.Read 填充 buffer 的字节可能少于您请求的字节数,但您遇到的主要问题是 只有一个byte[]你一遍又一遍地使用。

当您的读取循环开始读取第二个值时,它会覆盖位于您传递给队列的 dataPair 中的 byte[]。你必须有一个 buffer = new byte[ChunkSize]; inside 循环来解决这个问题。您还必须记录读取了多少字节,并且只写入了相同数量的字节。

你不需要保留pair中的counter因为Queue会保持顺序,使用pair中的int来存储记录的字节数就像 alexm 的例子一样。