CsvHelper - 填充数据表缓慢(ish)

CsvHelper - Populating datatable slow(ish)

将下面的代码放在一起以读取一组特定的 CSV 文件。它有效,但仍在进行中。代码的一部分(填充数据 table 行 - 请参阅下面的片段)花费的时间 运行 与 SqlBulkCopy 操作一样长。询问 advice/recommendations 如何提高性能。

在代码(下方)中,以 50K 批次处理一个 ~15M 行文件只用了不到 11.5 分钟。分解部分。 SqlBulkCopy 耗时约 236 公里(4 分钟),reader 只需要 105 公里(约 1.5 分钟),而填充数据的部分table 耗时约 200 公里(3.33 分钟)。

     csvTableTimer.Start();
     // Process row and populate datatable
     DataRow dr = dt.NewRow();
 
          foreach (DataColumn dc in dt.Columns)
          {
               if (row.GetType().GetProperty(dc.ToString()).GetValue(row) != null)
               {
                    dr[dc.ToString()] = row.GetType().GetProperty(dc.ToString()).GetValue(row);
               }
           }
           dt.Rows.Add(dr);
 
      csvTableTimer.Stop();

CSV 文件非常大 (10+GB),没有 headers。我正在使用 Class 构建 datatable 结构,并希望在填充 datatable 行时继续使用该方法,因为我需要扩展它以使用多个 CSV类型。

数据table 反映了 class 中与 SQL 数据库 table 对齐的列名称。曾想使用 GetField(已转换,而非原始)遍历数据 table row[column.ColumnName] = csv.GetField( column.DataType, column.ColumnName ); 中的每一列,但一直收到关于没有 headers 的错误。发现了一个与 HasHeaderRecord = false 相关的未解决问题,该问题与我尝试做的事情相匹配,这增加了我向更熟练的人寻求建议的愿望。感谢您的帮助!

在代码块上展开;

     var rconfig = new CsvHelper.Configuration.CsvConfiguration(CultureInfo.InvariantCulture)
     {
         BufferSize = 1024,
         Delimiter = ",",
         AllowComments = true,
         HasHeaderRecord = false,
         HeaderValidated = null,
         IgnoreBlankLines = true,
         MissingFieldFound = null,
         Comment = '#',
         Escape = '"',
         TrimOptions = TrimOptions.Trim,
         BadDataFound = x =>
         {
             isBadRecord = true;
             ErrRecords.Add(x.RawRecord);
             ++badCount;
         }
     };

     var loadFType = @"B";
     // Create datatable using class as definition.
     PropertyDescriptorCollection props1 = TypeDescriptor.GetProperties(loaderFileType);
     DataTable dt = new DataTable();
     dt = UtilExtensions.CreateDataTable(props1);

     using (var reader = new StreamReader(rFile))
     {
         reader.ReadLine();
          
         using (var csv = new CsvReader(reader, rconfig))
         {
             switch (loadFType)
             {
                 case "ALL":
                     csv.Context.RegisterClassMap<CSVLoader.AMap>();
                     var allRecords = new List<CSVLoader.A>();
                     break;
                 case "BAL":
                     csv.Context.RegisterClassMap<CSVLoader.BMap>();
                     var balRecords = new List<CSVLoader.B>();
                     break;

                 case "CIF":
                     csv.Context.RegisterClassMap<CSVLoader.CMap>();
                     var cifRecords = new List<CSVLoader.C>();
                     break;
             }

             dt.BeginLoadData();
             while (csv.Read())
             {
                 csvReadTimer.Start();
                 var row = csv.GetRecord(loaderFileType);
                 csvReadTimer.Stop();

                 runningCount++;

                 if (!isBadRecord)
                 {
                      csvTableTimer.Start();
                      // Process row and populate datatable
                      DataRow dr = dt.NewRow();

                      foreach (DataColumn dc in dt.Columns)
                      {
                          if (row.GetType().GetProperty(dc.ToString()).GetValue(row) != null)
                          {
                              dr[dc.ToString()] = row.GetType().GetProperty(dc.ToString()).GetValue(row);
                          }
                      }
                     dt.Rows.Add(dr);

                     csvTableTimer.Stop();
                     ++goodCount;

                     if (batchCount >= dtbatchSize || runningCount >= fileRecCount)
                     {
                         try
                         {
                             // Write from the source to the destination.
                             bcpLoadTimer.Start();

                             bulkCopy.WriteToServer(dt);

                             bcpLoadTimer.Stop();
                             bcpLoadBatchCount++;

                         }
                         catch (Exception ex)
                         {
                         }
                         dt.Clear();
                         batchCount = 0;
                     }
                     batchCount++;
                 }
                 isBadRecord = false;
             }
             dt.EndLoadData();
             reader.Close();
             dt.Clear();
             transaction.Commit();
// B
public class B
{
    [Index(0)]
    public string A { get; set; }
    [Index(1)]
    public string BString { get; set; }
    [Index(2)]
    public int? C { get; set; }
    [Index(3)]
    public string D { get; set; }
    [Index(4)]
    public string E { get; set; }
    [Index(5)]
    public DateTime? F { get; set; }
    [Index(6)]
    public decimal? G { get; set; }
    [Index(7)]
    public decimal? H { get; set; }
    [Index(8)]
    public decimal? I { get; set; }
    [Index(9)]
    public decimal? J { get; set; }
    [Index(10)]
    public int? K { get; set; }
    [Index(11)]
    public string L { get; set; }
    [Index(12)]
    public DateTime? M { get; set; }
}

// B
public sealed class BMap : ClassMap<B>
{
    public BMap()
    {
        // AutoMap(CultureInfo.InvariantCulture);
        Map(m => m.A).Index(0);
        Map(m => m.BString).Index(1); 
        Map(m => m.C).Index(2);
        Map(m => m.D).Index(3);
        Map(m => m.E).Index(4);
        Map(m => m.F).Index(5).TypeConverterOption.Format("yyyyMMdd");
        Map(m => m.G).Index(6);
        Map(m => m.H).Index(7);
        Map(m => m.I).Index(8);
        Map(m => m.J).Index(9);
        Map(m => m.K).Index(10);
        Map(m => m.L).Index(11);
        Map(m => m.M).Index(12).TypeConverterOption.Format("yyyy-MM-dd-hh.mm.ss.ffffff");
    }
}

您的问题实际上并未包含 minimal reproducible example,因此我简化了您的代码以创建以下 FileLoader class 计算填充 [=16 所需的时间=] 来自使用 CsvReader:

从 CSV 行中读取的某些 class TClass(此处 B)的实例
public class FileLoader
{
    public System.Diagnostics.Stopwatch csvTableTimer { get; } = new();

    public long Load<TClass, TClassMap>(string rFile, int dtbatchSize) where TClassMap : ClassMap<TClass>, new()
    {
        bool isBadRecord = false;
        long badCount = 0;
        long runningCount = 0;
        long goodCount = 0;
        long batchCount = 0;

        var rconfig = CreateCsvConfiguration(
            x => 
            {
                isBadRecord = true;
                //ErrRecords.Add(x.RawRecord);
                ++badCount;
            });
        
        // Create datatable using class as definition.
        var dt = UtilExtensions.CreateDataTable(typeof(TClass));

        using (var reader = new StreamReader(rFile))
        {
            //reader.ReadLine();  FIXED - THIS SKIPPED THE FIRST LINE AND CAUSED A RECORD TO BE OMITTED.
            using (var csv = new CsvReader(reader, rconfig))
            {
                csv.Context.RegisterClassMap<TClassMap>();

                dt.BeginLoadData();
                while (csv.Read())
                {
                    isBadRecord = false;
                    //csvReadTimer.Start();
                    var record = csv.GetRecord<TClass>();
                    //csvReadTimer.Stop();

                    runningCount++;
                    if (!isBadRecord)
                    {
                        csvTableTimer.Start();
                        // Process row and populate datatable
                        DataRow dr = dt.NewRow();
                        foreach (DataColumn dc in dt.Columns)
                        {
                            if (record.GetType().GetProperty(dc.ToString()).GetValue(record) != null)
                            {
                                dr[dc.ToString()] = record.GetType().GetProperty(dc.ToString()).GetValue(record);
                            }
                        }
                        dt.Rows.Add(dr);
                        csvTableTimer.Stop();
                        goodCount++;
                        if (++batchCount >= dtbatchSize)
                        {
                            // Flush the data table
                            FlushTable(dt);
                            batchCount = 0;
                        }
                    }
                }
                dt.EndLoadData();
                FlushTable(dt);
                Commit();
            }
        }
        
        return goodCount;
    }

    protected virtual void FlushTable(DataTable dt) => dt.Clear();  // Replace with SqlBulkCopy 
    protected virtual void Commit() {} // Replace with transaction.Commit();
    
    public static CsvConfiguration CreateCsvConfiguration(BadDataFound badDataFound) => 
        new CsvHelper.Configuration.CsvConfiguration(CultureInfo.InvariantCulture)
        {
            BufferSize = 1024,
            Delimiter = ",",
            AllowComments = true,
            HasHeaderRecord = false,
            HeaderValidated = null,
            IgnoreBlankLines = true,
            MissingFieldFound = null,
            Comment = '#',
            Escape = '"',
            TrimOptions = TrimOptions.Trim,
            BadDataFound = badDataFound,
        };
}

public static partial class UtilExtensions
{
    static IEnumerable<PropertyInfo> GetSerializableProperties(this Type type) => 
        type.GetProperties().Where(p => p.GetIndexParameters().Length == 0 && p.CanRead && p.CanWrite && p.GetGetMethod() != null && p.GetSetMethod() != null);
    
    public static DataTable CreateDataTable(Type type)
    {
        var dt = new DataTable();
        foreach (var p in type.GetSerializableProperties())
            dt.Columns.Add(p.Name, Nullable.GetUnderlyingType(p.PropertyType) ?? p.PropertyType);
        return dt;
    }
}

然后,如果我使用文件加载器并调用 loader.Load<B, BMap>(rFile, 1000) 读取 5555 行的 CSV 文件 20 次,在 dotnetfiddle 上大约需要 1049 毫秒。请参阅演示 #1 here

您遇到的一个问题是 c# 中的反射可能非常慢。您正在调用 record.GetType().GetProperty(dc.ToString()).GetValue(record) 两次,如果我简单地将调用次数减少 1,则时间会减少到 706 毫秒左右:

                        foreach (DataColumn dc in dt.Columns)
                        {
                            var value = record.GetType().GetProperty(dc.ToString()).GetValue(record);
                            if (value != null)
                            {
                                dr[dc.ToString()] = value;
                            }
                        }

演示 #2 here.

但是,我们可以通过在运行时制造委托来做得更好。首先,添加以下使用 System.Linq.Expressions 命名空间的实用方法:

public static partial class UtilExtensions
{
    public static Func<TSource, object> CreatePropertyGetter<TSource>(PropertyInfo propertyInfo)
    {
        var parameter = Expression.Parameter(typeof(TSource), "obj");
        var property = Expression.Property(parameter, propertyInfo);
        var convert = Expression.Convert(property, typeof(object));
        var lambda = Expression.Lambda(typeof(Func<TSource, object>), convert, parameter);

        return (Func<TSource, object>)lambda.Compile();
    }

    public static ReadOnlyDictionary<string, Func<TSource, object>> PropertyGetters<TSource>() => PropertyExpressionsCache<TSource>.PropertyGetters;

    static ReadOnlyDictionary<string, Func<TSource, object>> CreatePropertyGetters<TSource>() =>
        typeof(TSource)
            .GetSerializableProperties()
            .ToDictionary(p => p.Name,
                          p => CreatePropertyGetter<TSource>(p))
            .ToReadOnly();

    static class PropertyExpressionsCache<TSource>
    {
        public static ReadOnlyDictionary<string, Func<TSource, object>> PropertyGetters { get; } = UtilExtensions.CreatePropertyGetters<TSource>();
    }
    
    public static ReadOnlyDictionary<TKey, TValue> ToReadOnly<TKey, TValue>(this IDictionary<TKey, TValue> dictionary) => 
        new ReadOnlyDictionary<TKey, TValue>(dictionary ?? throw new ArgumentNullException());
}

并修改Load<TClass, TClassMap>()如下:

public long Load<TClass, TClassMap>(string rFile, int dtbatchSize) where TClassMap : ClassMap<TClass>, new()
{
    bool isBadRecord = false;
    long badCount = 0;
    long runningCount = 0;
    long goodCount = 0;
    long batchCount = 0;

    var rconfig = CreateCsvConfiguration(
        x => 
        {
            isBadRecord = true;
            //ErrRecords.Add(x.RawRecord);
            ++badCount;
        });
    
    var loaderFileType = typeof(TClass);

    // Create datatable using class as definition.
    var dt = UtilExtensions.CreateDataTable(loaderFileType);
    var properties = UtilExtensions.PropertyGetters<TClass>();

    using (var reader = new StreamReader(rFile))
    {
        //reader.ReadLine();  FIXED - THIS SKIPPED THE FIRST LINE AND CAUSED A RECORD TO BE OMITTED.
        using (var csv = new CsvReader(reader, rconfig))
        {
            csv.Context.RegisterClassMap<TClassMap>();

            dt.BeginLoadData();
            while (csv.Read())
            {
                isBadRecord = false;
                //csvReadTimer.Start();
                var record = csv.GetRecord<TClass>();
                //csvReadTimer.Stop();

                runningCount++;
                if (!isBadRecord)
                {
                    csvTableTimer.Start();
                    // Process row and populate datatable
                    DataRow dr = dt.NewRow();
                    foreach (var p in properties)
                    {
                        var value = p.Value(record);
                        if (value != null)
                            dr[p.Key] =  value;
                    }
                    dt.Rows.Add(dr);
                    csvTableTimer.Stop();
                    goodCount++;
                    if (++batchCount >= dtbatchSize)
                    {
                        // Flush the data table
                        FlushTable(dt);
                        batchCount = 0;
                    }
                }
            }
            dt.EndLoadData();
            FlushTable(dt);
        }
    }
    
    return goodCount;
}

时间将进一步减少,大约为 404 毫秒。演示 fiddle #3 here.

我也试过使用 Delegate.CreateDelegate() 而不是 Expression:

public static partial class UtilExtensions
{
    static Func<TSource, object> CreateTypedPropertyGetter<TSource, TValue>(PropertyInfo propertyInfo)
    {
        var typedFunc = (Func<TSource, TValue>)Delegate.CreateDelegate(typeof(Func<TSource, TValue>), propertyInfo.GetGetMethod());
        return i => (object)typedFunc(i);
    }

    public static Func<TSource, object> CreatePropertyGetter<TSource>(PropertyInfo propertyInfo)
    {
        var typedCreator = typeof(UtilExtensions).GetMethod(nameof(CreateTypedPropertyGetter), BindingFlags.Static | BindingFlags.Public | BindingFlags.NonPublic);
        var concreteTypedCreator = typedCreator = typedCreator.MakeGenericMethod(typeof(TSource), propertyInfo.PropertyType);
        return (Func<TSource, object>)concreteTypedCreator.Invoke(null, new object [] { propertyInfo });
    }

    public static ReadOnlyDictionary<string, Func<TSource, object>> PropertyGetters<TSource>() => PropertyExpressionsCache<TSource>.PropertyGetters;

    static ReadOnlyDictionary<string, Func<TSource, object>> CreatePropertyGetters<TSource>() =>
        typeof(TSource)
            .GetSerializableProperties()
            .ToDictionary(p => p.Name,
                          p => CreatePropertyGetter<TSource>(p))
            .ToReadOnly();

    static class PropertyExpressionsCache<TSource>
    {
        public static ReadOnlyDictionary<string, Func<TSource, object>> PropertyGetters { get; } = UtilExtensions.CreatePropertyGetters<TSource>();
    }
    
    public static ReadOnlyDictionary<TKey, TValue> ToReadOnly<TKey, TValue>(this IDictionary<TKey, TValue> dictionary) => 
        new ReadOnlyDictionary<TKey, TValue>(dictionary ?? throw new ArgumentNullException());
}

得到的时间大致相同,为 410 毫秒。演示 fiddle #4 here.

备注:

  • 您问题中的代码通过调用 reader.ReadLine();.

    跳过了 CSV 文件的第一行

    在我的测试工具中,这导致读取的记录数量不正确,因此我删除了这一行。

  • 我提取了一个将记录类型和 class 映射类型作为通用参数的通用方法,而不是使用一个对记录类型进行开关的非通用方法。这使得委托创建更容易一些,因为不再需要对记录类型进行运行时转换。