如何通过数据表提高 CSV 上传的性能

How to improve performance of CSV upload via datatable

我有一个上传 CSV 文件的有效解决方案。目前,我使用 IFormCollection 让用户从一个视图上传多个 CSV 文件。

CSV 文件保存为临时文件如下:

List<string> fileLocations = new List<string>();
foreach (var formFile in files)
{
   filePath = Path.GetTempFileName();    
   if (formFile.Length > 0)
   {
       using (var stream = new FileStream(filePath, FileMode.Create))
       {
           await formFile.CopyToAsync(stream);
       }
   }

   fileLocations.Add(filePath);
}

我将文件位置列表发送到另一种方法(就在下面)。我遍历文件位置并从临时文件流式传输数据,然后使用数据 table 和 SqlBulkCopy 插入数据。我目前一次上传 50 到 200 个文件,每个文件大约 330KB。插入一百个,大约需要6分钟,也就是30-35MB左右。

public void SplitCsvData(string fileLocation, Guid uid)
        {
            MetaDataModel MetaDatas;
            List<RawDataModel> RawDatas;

            var reader = new StreamReader(File.OpenRead(fileLocation));
            List<string> listRows = new List<string>();
            while (!reader.EndOfStream)
            {
                listRows.Add(reader.ReadLine());
            }

            var metaData = new List<string>();
            var rawData = new List<string>();

            foreach (var row in listRows)
            {
                var rowName = row.Split(',')[0];
                bool parsed = int.TryParse(rowName, out int result);

                if (parsed == false)
                {
                    metaData.Add(row);
                }
                else
                {
                    rawData.Add(row);
                }
            }

         //Assigns the vertical header name and value to the object by splitting string 
         RawDatas = GetRawData.SplitRawData(rawData);
         SaveRawData(RawDatas);

         MetaDatas = GetMetaData.SplitRawData(rawData);
         SaveRawData(RawDatas);

        }

此代码然后将对象传递给以创建数据table并插入数据。

private DataTable CreateRawDataTable
{
   get
   {
       var dt = new DataTable();
       dt.Columns.Add("Id", typeof(int));
       dt.Columns.Add("SerialNumber", typeof(string));
       dt.Columns.Add("ReadingNumber", typeof(int));
       dt.Columns.Add("ReadingDate", typeof(string));
       dt.Columns.Add("ReadingTime", typeof(string));
       dt.Columns.Add("RunTime", typeof(string));
       dt.Columns.Add("Temperature", typeof(double));
       dt.Columns.Add("ProjectGuid", typeof(Guid));
       dt.Columns.Add("CombineDateTime", typeof(string));

        return dt;
  }
}

public void SaveRawData(List<RawDataModel> data)
{
       DataTable dt = CreateRawDataTable;
       var count = data.Count;          

       for (var i = 1; i < count; i++)
       {
           DataRow row = dt.NewRow();
           row["Id"] = data[i].Id;
           row["ProjectGuid"] = data[i].ProjectGuid;
           row["SerialNumber"] = data[i].SerialNumber;
           row["ReadingNumber"] = data[i].ReadingNumber;
           row["ReadingDate"] = data[i].ReadingDate;
           row["ReadingTime"] = data[i].ReadingTime;
           row["CombineDateTime"] = data[i].CombineDateTime;
           row["RunTime"] = data[i].RunTime;
           row["Temperature"] = data[i].Temperature;
           dt.Rows.Add(row);
        }

        using (var conn = new SqlConnection(connectionString))
        {
           conn.Open();
           using (SqlTransaction tr = conn.BeginTransaction())
           {
               using (var sqlBulk = new SqlBulkCopy(conn, SqlBulkCopyOptions.Default, tr))
               {
                   sqlBulk.BatchSize = 1000;
                   sqlBulk.DestinationTableName = "RawData";
                   sqlBulk.WriteToServer(dt);
               }
               tr.Commit();
           }
       }
   }

是否有其他方法或更好的方法来提高性能,以便减少上传时间,因为它可能需要很长时间,而且我看到内存使用量不断增加到 500MB 左右。

TIA

您可以通过删除 DataTable 并直接从输入流读取来提高性能。

SqlBulkCopy 有一个 WriteToServer 重载,它接受 IDataReader 而不是整个 DataTable。

CsvHelper 可以使用 StreamReader 作为输入来生成 CSV 文件。它提供 CsvDataReader 作为 CSV 数据之上的 IDataReader 实现。这允许直接从输入流读取并写入 SqlBulkCopy。

以下方法将从 IFormFile 读取,使用 CsvHelper 解析流并使用 CSV 字段配置 SqlBulkCopy 实例:

public async Task ToTable(IFormFile file, string table)
{
    using (var stream = file.OpenReadStream())
    using (var tx = new StreamReader(stream))
    using (var reader = new CsvReader(tx))
    using (var rd = new CsvDataReader(reader))
    {
        var headers = reader.Context.HeaderRecord;

        var bcp = new SqlBulkCopy(_connection)
        {
            DestinationTableName = table
        };
        //Assume the file headers and table fields have the same names
        foreach(var header in headers)
        {
            bcp.ColumnMappings.Add(header, header);
        }

        await bcp.WriteToServerAsync(rd);                
    }
}

这样就不会将任何内容写入临时 table 或缓存在内存中。上传的文件直接解析并写入数据库

除了@Panagiotis 的回答之外,您为什么不将文件处理与文件上传交织在一起?在异步方法中包装您的文件处理逻辑并将循环更改为 Parallel.Foreach 并在每个文件到达时处理它而不是等待所有文件?

private static readonly object listLock = new Object(); // only once at class level


    List<string> fileLocations = new List<string>();
    Parallel.ForEach(files, (formFile) => 
    {
       filePath = Path.GetTempFileName();    
       if (formFile.Length > 0)
       {
           using (var stream = new FileStream(filePath, FileMode.Create))
           {
               await formFile.CopyToAsync(stream);
           }

           await ProcessFileInToDbAsync(filePath); 
       }

       // Added lock for thread safety of the List 
       lock (listLock)
       {
           fileLocations.Add(filePath);
       }     
    });

感谢@Panagiotis Kanavos,我能够弄清楚该怎么做。首先,我调用方法的方式是将它们留在内存中。我拥有的 CSV 文件分为两部分,垂直元数据和通常的水平信息。所以我需要把它们分成两部分。将它们保存为 tmp 文件也会导致开销。它已经从需要 5-6 分钟变成现在需要 1 分钟,我想这对于包含 8,500 行的 100 个文件来说还不错。

调用方法:

public async Task<IActionResult> UploadCsvFiles(ICollection<IFormFile> files, IFormCollection fc)
{
   foreach (var f in files)
   {
       var getData = new GetData(_configuration);
       await getData.SplitCsvData(f, uid);
   }

   return whatever;
}

这是拆分的方法:

public async Task SplitCsvData(IFormFile file, string uid)
    {
        var data = string.Empty;
        var m = new List<string>();
        var r = new List<string>();

        var records = new List<string>();
        using (var stream = file.OpenReadStream())
        using (var reader = new StreamReader(stream))
        {
            while (!reader.EndOfStream)
            {
                var line = reader.ReadLine();
                var header = line.Split(',')[0].ToString();
                bool parsed = int.TryParse(header, out int result);
                if (!parsed)
                {
                    m.Add(line);
                }
                else
                {
                    r.Add(line);
                }
            }
        }

        //TODO: Validation
        //This splits the list into the Meta data model. This is just a single object, with static fields.
        var metaData = SplitCsvMetaData.SplitMetaData(m, uid);
        DataTable dtm = CreateMetaData(metaData);
        var serialNumber = metaData.LoggerId;
        await SaveMetaData("MetaData", dtm);

        //
        var lrd = new List<RawDataModel>();
        foreach (string row in r)
        {
            lrd.Add(new RawDataModel
            {
                Id = 0,
                SerialNumber = serialNumber,
                ReadingNumber = Convert.ToInt32(row.Split(',')[0]),
                ReadingDate = Convert.ToDateTime(row.Split(',')[1]).ToString("yyyy-MM-dd"),
                ReadingTime = Convert.ToDateTime(row.Split(',')[2]).ToString("HH:mm:ss"),
                RunTime = row.Split(',')[3].ToString(),
                Temperature = Convert.ToDouble(row.Split(',')[4]),
                ProjectGuid = uid.ToString(),
                CombineDateTime = Convert.ToDateTime(row.Split(',')[1] + " " + row.Split(',')[2]).ToString("yyyy-MM-dd HH:mm:ss")
            });
        }

        await SaveRawData("RawData", lrd);
    }

然后我将数据 table 用于元数据(100 个文件需要 20 秒),因为我将字段名称映射到列。

 public async Task SaveMetaData(string table, DataTable dt)
    {
        using (SqlBulkCopy sqlBulk = new SqlBulkCopy(_configuration.GetConnectionString("DefaultConnection"), SqlBulkCopyOptions.Default))
        { 
            sqlBulk.DestinationTableName = table;
            await sqlBulk.WriteToServerAsync(dt);
        }
    }

然后我将 FastMember 用于原始数据的大数据部分,这更像是传统的 CSV。

 public async Task SaveRawData(string table, IEnumerable<LogTagRawDataModel> lrd)
    {
        using (SqlBulkCopy sqlBulk = new SqlBulkCopy(_configuration.GetConnectionString("DefaultConnection"), SqlBulkCopyOptions.Default))
        using (var reader = ObjectReader.Create(lrd, "Id","SerialNumber", "ReadingNumber", "ReadingDate", "ReadingTime", "RunTime", "Temperature", "ProjectGuid", "CombineDateTime"))
        {                
            sqlBulk.DestinationTableName = table;
            await sqlBulk.WriteToServerAsync(reader);
        }  
    }

我相信这可以改进,但就目前而言,它非常有效。