在 C# 的多线程中处理多个文件的最佳方式是什么?

What is the best way to work with multiple files in multithread in C#?

我正在创建一个 Windows 表单应用程序,其中我 select 一个包含多个 *.txt 文件的文件夹。它们的长度可能从几千行 (kB) 到最多 5000 万行 (1GB) 不等。代码的每一行都有三个信息。 long 格式的日期、int 格式的位置 id 和 float 格式的值均以分号 (;) 分隔。我需要计算所有这些文件中的最小值和最大值,并告诉它在哪个文件中,然后是最频繁的值。

我已经验证了这些文件并将其存储在数组列表中。我正在打开一个线程来逐个读取文件,然后逐行读取数据。它工作正常,但是当有 1GB 的文件时,我 运行 内存不足。我尝试将值存储在字典中,其中键是日期,值是一个对象,其中包含从该行加载的所有信息以及文件名。我发现我不能使用字典,因为在大约 6M 值时,我 运行 内存不足。所以我应该在多线程中进行。我虽然可以 运行 两个线程,一个读取文件并将信息放入某种容器中,另一个读取文件并进行计算,然后从容器中删除值。但我不知道哪个容器可以做这样的事情。此外,我需要计算最频繁出现的值,因此需要将它们存储在某个地方,这让我回到了某种字典,但我已经知道我会 运行 内存不足。我对线程也没有太多经验,所以我不知道有什么可能。到目前为止,这是我的代码:

图形用户界面:

namespace STI {
    public partial class GUI : Form {
        private String path = null;
        public static ArrayList txtFiles;

        public GUI() {
            InitializeComponent();
            _GUI1 = this;
        }

       //I run it in thread. I thought I would run the second 
       //one here that would work with the values inputed in some container
        private void buttonRun_Click(object sender, EventArgs e) {
            ThreadDataProcessing processing = new ThreadDataProcessing();
            Thread t_process = new Thread(processing.runProcessing);
            t_process.Start();

            //ThreadDataCalculating calculating = new ThreadDataCalculating();
            //Thread t_calc = new Thread(calculating.runCalculation());
            //t_calc.Start();

        }


    }
}

ThreadProcessing.cs

namespace STI.thread_package {
    class ThreadDataProcessing {
        public static Dictionary<long, object> finalMap = new Dictionary<long, object>();

        public void runProcessing() {
            foreach (FileInfo file in GUI.txtFiles) {
                using (FileStream fs = File.Open(file.FullName.ToString(), FileMode.Open))
                using (BufferedStream bs = new BufferedStream(fs))
                using (StreamReader sr = new StreamReader(bs)) {
                    String line;
                    String[] splitted;
                    try { 
                        while ((line = sr.ReadLine()) != null) {
                            splitted = line.Split(';');

                            if (splitted.Length == 3) {
                                long date = long.Parse(splitted[0]);
                                int location = int.Parse(splitted[1]);
                                float value = float.Parse(splitted[2], CultureInfo.InvariantCulture);

                                Entry entry = new Entry(date, location, value, file.Name);

                                if (!finalMap.ContainsKey(entry.getDate())) {
                                    finalMap.Add(entry.getDate(), entry);

                                }
                            }
                        }
                        GUI._GUI1.update("File \"" + file.Name + "\" completed\n");
                    }
                    catch (FormatException ex) {
                        GUI._GUI1.update("Wrong file format.");
                    }
                    catch (OutOfMemoryException) {
                        GUI._GUI1.update("Out of memory");
                    }
                }

            }
        }
    }
}

以及我在其中放置值的对象: Entry.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace STI.entities_package {
    class Entry {
        private long date;
        private int location;
        private float value;
        private String fileName;
        private int count;

        public Entry(long date, int location, float value, String fileName) {
            this.date = date;
            this.location = location;
            this.value = value;
            this.fileName = fileName;

            this.count = 1;
        }

        public long getDate() {
            return date;
        }

        public int getLocation() {
            return location;
        }

        public String getFileName() {
            return fileName;
        }

    }
}

我认为多线程不会在这里帮助你 - 它可以帮助你将 IO 绑定任务与 CPU 绑定任务分开,但是你的 CPU 绑定任务太微不足道了,我认为他们不值得拥有自己的话题。多线程要做的就是不必要地增加问题的复杂性。

在常量内存中计算 min/max 是微不足道的:只需维护一个 minFile 和 maxFile 变量,当当前文件的值小于 minFile 或大于 maxFile 时,该变量就会更新。找到最频繁的值将需要更多的内存,但只有几百万个文件,您应该有足够的 RAM 来存储一个 Dictionary<float, int> 来维护每个值的频率,之后您遍历地图以确定哪个值的频率最高。如果由于某种原因你没有足够的 RAM(如果你 运行 内存不足,请确保关闭你的文件并收集垃圾,因为 Dictionary<float, int> 有几百万个条目应该适合不到 1 GB 的 RAM),那么您可以对文件进行多次传递:在第一次传递时,将值存储在 Dictionary<interval, int> 中,您将 MIN_FLOAT 和 [= 之间的间隔分开20=] 分成几千个子区间,然后在下一次传递中,您可以忽略所有不适合频率最高的区间的值,从而缩小字典的大小。但是,Dictionary<float, int> 应该适合内存,因此除非您开始处理数十亿个文件而不是数百万个文件,否则您可能不需要多遍过程。