SqlBulkCopy 数据表,因为它们被添加到数据集
SqlBulkCopy DataTables as they are added to a DataSet
我想将 csv 中的值解析为数据table 块,将它们添加到数据集,然后使用 SQLBulkCopy 将数据table 插入到SQL 中的单个 table。原始 csv 的范围可以从 4 GB 到 8 GB,我需要避免将整个内容读入内存,因此要避免分块。我松散地将我的分块基于此 post. I use LumenWorks 来解析 csv 值。
一旦数据table被添加到数据集中,我想使用SqlBulkCopy将其插入到我的SQLtable中,而下一个数据[=25] =] 正在创建。 SqlBulkCopy 完成后,我想删除数据table 以释放内存。
我的第一个想法是 运行 不等待的异步分块方法,然后 运行 一个 while 循环来检查数据集中是否存在下一个数据 table。如果datatable存在,则批量复制。如果数据table 行计数小于行限制,则它是最后一个块并停止 while 循环。
我是不是走错了路?如果没有,我怎么能做这样的事情?
string filePath = @"C:\Users\user\Downloads\Testing\file - Copy.csv";
DataSet ds = new DataSet();
bool continueInsert = true;
int rowLimit = 100000;
int tableNumber = 0;
//Start this, but do not wait for it to complete before starting while loop
ChunkCSV(filePath, ds, rowLimit);
//Run SqlBulkCopy if datatable exists
while (continueInsert)
{
if (ds.Tables.Contains("tbl_" + tableNumber))
{
DataTable dataTable = ds.Tables["tbl_" + tableNumber];
//SqlBulkCopy dataTable code HERE
if (ds.Tables["tbl_" + tableNumber].Rows.Count < rowLimit)
{
continueInsert = false;
}
//Remove datatable from dataset to release memory
ds.Tables.Remove("tbl_" + tableNumber);
tableNumber++;
}
else
{
Thread.Sleep(1000);
}
}
这是我的分块代码:
private static void ChunkCSV(string filePath, DataSet dataSet, int rowLimit)
{
char delimiter = ',';
DataTable dtChunk = null;
int tableNumber = 0;
int chunkRowCount = 0;
bool firstLineOfChunk = true;
using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, '\"', '[=11=]', '[=11=]', ValueTrimmingOptions.All, 65536))
{
int fieldCount = csv.FieldCount;
string[] row = new string[fieldCount];
//Add fields when necessary
csv.MissingFieldAction = MissingFieldAction.ReplaceByEmpty;
while (csv.ReadNextRecord())
{
if (firstLineOfChunk)
{
firstLineOfChunk = false;
dtChunk = CreateDataTable(fieldCount, tableNumber);
}
DataRow dataRow = dtChunk.NewRow();
csv.CopyCurrentRecordTo(row);
for (int f = 0; f < fieldCount; f++)
{
dataRow[f] = row[f];
}
dtChunk.Rows.Add(dataRow);
chunkRowCount++;
if (chunkRowCount == rowLimit)
{
firstLineOfChunk = true;
chunkRowCount = 0;
tableNumber++;
dataSet.Tables.Add(dtChunk);
dtChunk = null;
}
}
}
if (dtChunk != null)
{
dataSet.Tables.Add(dtChunk);
}
}
private static DataTable CreateDataTable(int fieldCount, int tableNumber)
{
DataTable dt = new DataTable("tbl_" + tableNumber);
for(int i = 0; i < fieldCount; i++)
{
dt.Columns.Add("Column_" + i);
}
return dt;
}
没有理由一开始就使用 DataTable。
如果您不想在单个事务中加载所有行,请使用 SqlBulkCopy.WriteToServer(IDataReader) overload, and you can stream the whole file directly to SQL Server. And use SqlBulkCopy.BatchSize。
例如
using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, '\"', '[=10=]', '[=10=]', ValueTrimmingOptions.All, 65536))
{
bulkCopy.WriteToServer(csv);
}
我想将 csv 中的值解析为数据table 块,将它们添加到数据集,然后使用 SQLBulkCopy 将数据table 插入到SQL 中的单个 table。原始 csv 的范围可以从 4 GB 到 8 GB,我需要避免将整个内容读入内存,因此要避免分块。我松散地将我的分块基于此 post. I use LumenWorks 来解析 csv 值。
一旦数据table被添加到数据集中,我想使用SqlBulkCopy将其插入到我的SQLtable中,而下一个数据[=25] =] 正在创建。 SqlBulkCopy 完成后,我想删除数据table 以释放内存。
我的第一个想法是 运行 不等待的异步分块方法,然后 运行 一个 while 循环来检查数据集中是否存在下一个数据 table。如果datatable存在,则批量复制。如果数据table 行计数小于行限制,则它是最后一个块并停止 while 循环。
我是不是走错了路?如果没有,我怎么能做这样的事情?
string filePath = @"C:\Users\user\Downloads\Testing\file - Copy.csv";
DataSet ds = new DataSet();
bool continueInsert = true;
int rowLimit = 100000;
int tableNumber = 0;
//Start this, but do not wait for it to complete before starting while loop
ChunkCSV(filePath, ds, rowLimit);
//Run SqlBulkCopy if datatable exists
while (continueInsert)
{
if (ds.Tables.Contains("tbl_" + tableNumber))
{
DataTable dataTable = ds.Tables["tbl_" + tableNumber];
//SqlBulkCopy dataTable code HERE
if (ds.Tables["tbl_" + tableNumber].Rows.Count < rowLimit)
{
continueInsert = false;
}
//Remove datatable from dataset to release memory
ds.Tables.Remove("tbl_" + tableNumber);
tableNumber++;
}
else
{
Thread.Sleep(1000);
}
}
这是我的分块代码:
private static void ChunkCSV(string filePath, DataSet dataSet, int rowLimit)
{
char delimiter = ',';
DataTable dtChunk = null;
int tableNumber = 0;
int chunkRowCount = 0;
bool firstLineOfChunk = true;
using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, '\"', '[=11=]', '[=11=]', ValueTrimmingOptions.All, 65536))
{
int fieldCount = csv.FieldCount;
string[] row = new string[fieldCount];
//Add fields when necessary
csv.MissingFieldAction = MissingFieldAction.ReplaceByEmpty;
while (csv.ReadNextRecord())
{
if (firstLineOfChunk)
{
firstLineOfChunk = false;
dtChunk = CreateDataTable(fieldCount, tableNumber);
}
DataRow dataRow = dtChunk.NewRow();
csv.CopyCurrentRecordTo(row);
for (int f = 0; f < fieldCount; f++)
{
dataRow[f] = row[f];
}
dtChunk.Rows.Add(dataRow);
chunkRowCount++;
if (chunkRowCount == rowLimit)
{
firstLineOfChunk = true;
chunkRowCount = 0;
tableNumber++;
dataSet.Tables.Add(dtChunk);
dtChunk = null;
}
}
}
if (dtChunk != null)
{
dataSet.Tables.Add(dtChunk);
}
}
private static DataTable CreateDataTable(int fieldCount, int tableNumber)
{
DataTable dt = new DataTable("tbl_" + tableNumber);
for(int i = 0; i < fieldCount; i++)
{
dt.Columns.Add("Column_" + i);
}
return dt;
}
没有理由一开始就使用 DataTable。
如果您不想在单个事务中加载所有行,请使用 SqlBulkCopy.WriteToServer(IDataReader) overload, and you can stream the whole file directly to SQL Server. And use SqlBulkCopy.BatchSize。
例如
using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, '\"', '[=10=]', '[=10=]', ValueTrimmingOptions.All, 65536))
{
bulkCopy.WriteToServer(csv);
}