Guid.NewGuid() 在 Parallel.For 循环中使用时返回重复值
Guid.NewGuid() is returning duplicate values when used in a Parallel.For loop
我有一个应用程序遇到了 API。因此,它会查询对象中的所有 ID,然后必须针对每个 ID 一次查询一个项目。我在 Parallel.For 循环中执行此操作,并将每个项目数据添加到数据 table 中的一行。然后我使用 sqlbulkcopy 将数据 table 发送到 SQL 服务器 table。
如果我在不使用 Parallel.For 的情况下执行此操作,效果会很好。但是,对于 Parallel.For,这一行:
workrow["id"] = Guid.NewGuid();
正在生成重复的 Guid。它经常这样做并导致数据无法加载到 SQL 服务器 table 中,因为 SQL 中的 id 行是主键并且不允许重复。我试过锁定:
lock (lockobject)
{
workrow["id"] = Guid.NewGuid();
}
这没有帮助。
我尝试不为该字段分配 id,希望 SQL 会生成它(它在该字段上确实有 newid() )。那没有说它不能插入空值。
我似乎不能只从数据中删除 id 字段table 因为当我执行 sqlbulkcopy 时列不对齐。
有人可以帮我吗?我要么需要弄清楚如何让 Guid.NewGuid() 停止生成重复项,要么我需要弄清楚一种不传递 id(始终是数据 table 中的第一个字段)的方法,以便SQL 将生成 id。
这是我用来生成 table 之一的代码:
public static DataTable MakeWorkflowTable()
{
DataTable Workflow = new DataTable("Workflow");
DataColumn id = new DataColumn("id", System.Type.GetType("System.Guid"));
Workflow.Columns.Add(id);
DataColumn OrgInfoID = new DataColumn("OrgInfoID", System.Type.GetType("System.Guid"));
Workflow.Columns.Add(OrgInfoID);
DataColumn Name = new DataColumn("Name", System.Type.GetType("System.String"));
Workflow.Columns.Add(Name);
DataColumn Active = new DataColumn("Active", System.Type.GetType("System.String"));
Workflow.Columns.Add(Active);
DataColumn Description = new DataColumn("Description", System.Type.GetType("System.String"));
Workflow.Columns.Add(Description);
DataColumn Object = new DataColumn("Object", System.Type.GetType("System.String"));
Workflow.Columns.Add(Object);
DataColumn Formula = new DataColumn("Formula", System.Type.GetType("System.String"));
Workflow.Columns.Add(Formula);
DataColumn ManageableState = new DataColumn("ManageableState", System.Type.GetType("System.String"));
Workflow.Columns.Add(ManageableState);
DataColumn NameSpacePrefix = new DataColumn("NameSpacePrefix", System.Type.GetType("System.String"));
Workflow.Columns.Add(NameSpacePrefix);
DataColumn TDACount = new DataColumn("TDACount", System.Type.GetType("System.Int32"));
Workflow.Columns.Add(TDACount);
DataColumn TriggerType = new DataColumn("TriggerType", System.Type.GetType("System.String"));
Workflow.Columns.Add(TriggerType);
DataColumn CreatedDate = new DataColumn("CreatedDate", System.Type.GetType("System.DateTime"));
Workflow.Columns.Add(CreatedDate);
DataColumn CreatedBy = new DataColumn("CreatedBy", System.Type.GetType("System.String"));
Workflow.Columns.Add(CreatedBy);
DataColumn LastModifiedDate = new DataColumn("LastModifiedDate", System.Type.GetType("System.DateTime"));
Workflow.Columns.Add(LastModifiedDate);
DataColumn LastModifiedBy = new DataColumn("LastModifiedBy", System.Type.GetType("System.String"));
Workflow.Columns.Add(LastModifiedBy);
return Workflow;
}
这是我用来将它发送到 SQL 服务器的代码:
public static void SendDTtoDB(ref DataTable dt, ref SqlConnection cnn, string TableName)
{
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(cnn))
{
bulkCopy.DestinationTableName =
TableName;
try
{
bulkCopy.WriteToServer(dt);
dt.Clear();
}
catch (Exception e)
{
logger.Warn("SendDTtoDB {TableName}: ORGID: {ORGID} : {Message}", TableName, dt.Rows[0]["OrgInfoID"], e.Message.ToString());
if (e.Message.ToString().Contains("PRIMARY KEY"))
{
foreach(DataRow row in dt.Rows)
{
logger.Warn("ID: {id}", row["id"]);
}
}
}
}
}
正如您在 catch 语句中看到的那样,我将其设置为将 ID 写到日志中,这样我就可以自己查看它们,果然那里有一个重复项。太令人沮丧了!如果不需要的话,我真的不想取出 Parallel.For 和单线程。
根据请求,这里是带有 Parallel.For
的代码
if (qr.totalSize > 0)
{
object lockobject = new object();
Parallel.For(0, qr.records.Length, i =>
{
ToolingService.CustomTab1 vr = new ToolingService.CustomTab1();
vr = (ToolingService.CustomTab1)qr.records[i];
string mdSOQL = "Select FullName, description, ManageableState, MasterLabel, NamespacePrefix, Type, Url, CreatedDate, CreatedBy.Name, "
+ "LastModifiedDate, LastModifiedBy.Name From CustomTab where id='" + vr.Id + "'";
ToolingService.QueryResult mdqr = new ToolingService.QueryResult();
ToolingService.CustomTab1 vrmd = new ToolingService.CustomTab1();
mdqr = ts.query(mdSOQL);
vrmd = (ToolingService.CustomTab1)mdqr.records[0];
DataRow workrow = CustomTabs.NewRow();
lock (lockobject)
{
workrow["id"] = Guid.NewGuid();
}
workrow["OrgInfoID"] = _orgDBID;
workrow["FullName"] = vrmd.FullName;
workrow["Description"] = vrmd.Description ?? Convert.DBNull;
workrow["ManageableState"] = vrmd.ManageableState;
workrow["MasterLabel"] = vrmd.MasterLabel ?? Convert.DBNull;
workrow["NameSpacePrefix"] = vrmd.NamespacePrefix ?? Convert.DBNull;
workrow["Type"] = vrmd.Type ?? Convert.DBNull;
workrow["URL"] = vrmd.Url ?? Convert.DBNull;
workrow["CreatedDate"] = vrmd.CreatedDate ?? Convert.DBNull;
if (vrmd.CreatedBy == null)
{
workrow["CreatedBy"] = Convert.DBNull;
}
else
{
workrow["CreatedBy"] = vrmd.CreatedBy.Name;
}
workrow["LastModifiedDate"] = vrmd.LastModifiedDate ?? Convert.DBNull;
if (vrmd.LastModifiedBy == null)
{
workrow["LastModifiedBy"] = Convert.DBNull;
}
else
{
workrow["LastModifiedBy"] = vrmd.LastModifiedBy.Name;
}
lock (CustomTabs)
{
CustomTabs.Rows.Add(workrow);
}
});
OrgTables.SendDTtoDB(ref CustomTabs, ref _cnn, "OrgCustomTabs");
我以前见过这个问题。 Guid.NewGuid()
没有问题,但是 DataTable
不是线程安全的!
DataTable is simply not designed or intended for concurrent usage (in particular where there is any form of mutation involved).
见Thread safety for DataTable
还相关:c# DataGridView DataTable internal index corrupted in parallel loop
It's done in a loop because I have to query the API over and over to get each item in the object. I'm doing the Parallel.For loop so I can speed up the process. If I need to get 450 items or more one at a time, I'd like to multithread that for speed purposes. The database access isn't being done in the loop, just building the datatable is because once I get the data back from the API I need to store it.
您可以创建一个类型并将它们以多线程方式添加到 ConcurrentBag<T>
或 ConcurrentQueue<T>
(或另一个并发集合,请参阅 MS Docs)——它们是线程安全的 :)
然后您可以使用单线程构建 DataTable
。或者,如果您的用例可能的话,可以跳过整个数据表。
问题在于,必须在 DataTable 的 Parallel.ForEach
中使用 lock
,这首先违背了使用 Parallel.ForEach
的目的;但是,令我惊讶的是,当您调用 DataRow workrow = CustomTabs.NewRow();
时您没有得到异常,因为在我的测试中,我得到了一个索引损坏的异常。实际上,我不得不将对 NewRow
的调用包装在一个锁中。像这样:
Parallel.ForEach(data, x =>
{
DataRow row = null;
lock (lockRow)
{
row = dt.NewRow();
row["Guid"] = Guid.NewGuid();
}
...
lock(lockObj)
dt.Rows.Add(row);
其中 lockObj
和 lockRow
是实例化为
的 2 个单独的静态对象
static object lockObj = new object();
static object lockRow = new object();
这对我有用,将 100 万行添加到 DataTable
并确保所有 Guid 都是唯一的。
综上所述,我强烈建议按照 Julian 的建议编写代码或创建一个实现 IDataReader
的 class(您可以将其与 SQLBulkCopy 一起使用)并使用上传数据那。
我使用了每次访问时都锁定的 guidgenerator。
lock (guidGenerator)
{
entity.Id = guidGenerator.NewGuid();
}
对我来说效果很好。不过这是一种不同的方法。
我有一个应用程序遇到了 API。因此,它会查询对象中的所有 ID,然后必须针对每个 ID 一次查询一个项目。我在 Parallel.For 循环中执行此操作,并将每个项目数据添加到数据 table 中的一行。然后我使用 sqlbulkcopy 将数据 table 发送到 SQL 服务器 table。
如果我在不使用 Parallel.For 的情况下执行此操作,效果会很好。但是,对于 Parallel.For,这一行:
workrow["id"] = Guid.NewGuid();
正在生成重复的 Guid。它经常这样做并导致数据无法加载到 SQL 服务器 table 中,因为 SQL 中的 id 行是主键并且不允许重复。我试过锁定:
lock (lockobject)
{
workrow["id"] = Guid.NewGuid();
}
这没有帮助。
我尝试不为该字段分配 id,希望 SQL 会生成它(它在该字段上确实有 newid() )。那没有说它不能插入空值。
我似乎不能只从数据中删除 id 字段table 因为当我执行 sqlbulkcopy 时列不对齐。
有人可以帮我吗?我要么需要弄清楚如何让 Guid.NewGuid() 停止生成重复项,要么我需要弄清楚一种不传递 id(始终是数据 table 中的第一个字段)的方法,以便SQL 将生成 id。
这是我用来生成 table 之一的代码:
public static DataTable MakeWorkflowTable()
{
DataTable Workflow = new DataTable("Workflow");
DataColumn id = new DataColumn("id", System.Type.GetType("System.Guid"));
Workflow.Columns.Add(id);
DataColumn OrgInfoID = new DataColumn("OrgInfoID", System.Type.GetType("System.Guid"));
Workflow.Columns.Add(OrgInfoID);
DataColumn Name = new DataColumn("Name", System.Type.GetType("System.String"));
Workflow.Columns.Add(Name);
DataColumn Active = new DataColumn("Active", System.Type.GetType("System.String"));
Workflow.Columns.Add(Active);
DataColumn Description = new DataColumn("Description", System.Type.GetType("System.String"));
Workflow.Columns.Add(Description);
DataColumn Object = new DataColumn("Object", System.Type.GetType("System.String"));
Workflow.Columns.Add(Object);
DataColumn Formula = new DataColumn("Formula", System.Type.GetType("System.String"));
Workflow.Columns.Add(Formula);
DataColumn ManageableState = new DataColumn("ManageableState", System.Type.GetType("System.String"));
Workflow.Columns.Add(ManageableState);
DataColumn NameSpacePrefix = new DataColumn("NameSpacePrefix", System.Type.GetType("System.String"));
Workflow.Columns.Add(NameSpacePrefix);
DataColumn TDACount = new DataColumn("TDACount", System.Type.GetType("System.Int32"));
Workflow.Columns.Add(TDACount);
DataColumn TriggerType = new DataColumn("TriggerType", System.Type.GetType("System.String"));
Workflow.Columns.Add(TriggerType);
DataColumn CreatedDate = new DataColumn("CreatedDate", System.Type.GetType("System.DateTime"));
Workflow.Columns.Add(CreatedDate);
DataColumn CreatedBy = new DataColumn("CreatedBy", System.Type.GetType("System.String"));
Workflow.Columns.Add(CreatedBy);
DataColumn LastModifiedDate = new DataColumn("LastModifiedDate", System.Type.GetType("System.DateTime"));
Workflow.Columns.Add(LastModifiedDate);
DataColumn LastModifiedBy = new DataColumn("LastModifiedBy", System.Type.GetType("System.String"));
Workflow.Columns.Add(LastModifiedBy);
return Workflow;
}
这是我用来将它发送到 SQL 服务器的代码:
public static void SendDTtoDB(ref DataTable dt, ref SqlConnection cnn, string TableName)
{
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(cnn))
{
bulkCopy.DestinationTableName =
TableName;
try
{
bulkCopy.WriteToServer(dt);
dt.Clear();
}
catch (Exception e)
{
logger.Warn("SendDTtoDB {TableName}: ORGID: {ORGID} : {Message}", TableName, dt.Rows[0]["OrgInfoID"], e.Message.ToString());
if (e.Message.ToString().Contains("PRIMARY KEY"))
{
foreach(DataRow row in dt.Rows)
{
logger.Warn("ID: {id}", row["id"]);
}
}
}
}
}
正如您在 catch 语句中看到的那样,我将其设置为将 ID 写到日志中,这样我就可以自己查看它们,果然那里有一个重复项。太令人沮丧了!如果不需要的话,我真的不想取出 Parallel.For 和单线程。
根据请求,这里是带有 Parallel.For
的代码 if (qr.totalSize > 0)
{
object lockobject = new object();
Parallel.For(0, qr.records.Length, i =>
{
ToolingService.CustomTab1 vr = new ToolingService.CustomTab1();
vr = (ToolingService.CustomTab1)qr.records[i];
string mdSOQL = "Select FullName, description, ManageableState, MasterLabel, NamespacePrefix, Type, Url, CreatedDate, CreatedBy.Name, "
+ "LastModifiedDate, LastModifiedBy.Name From CustomTab where id='" + vr.Id + "'";
ToolingService.QueryResult mdqr = new ToolingService.QueryResult();
ToolingService.CustomTab1 vrmd = new ToolingService.CustomTab1();
mdqr = ts.query(mdSOQL);
vrmd = (ToolingService.CustomTab1)mdqr.records[0];
DataRow workrow = CustomTabs.NewRow();
lock (lockobject)
{
workrow["id"] = Guid.NewGuid();
}
workrow["OrgInfoID"] = _orgDBID;
workrow["FullName"] = vrmd.FullName;
workrow["Description"] = vrmd.Description ?? Convert.DBNull;
workrow["ManageableState"] = vrmd.ManageableState;
workrow["MasterLabel"] = vrmd.MasterLabel ?? Convert.DBNull;
workrow["NameSpacePrefix"] = vrmd.NamespacePrefix ?? Convert.DBNull;
workrow["Type"] = vrmd.Type ?? Convert.DBNull;
workrow["URL"] = vrmd.Url ?? Convert.DBNull;
workrow["CreatedDate"] = vrmd.CreatedDate ?? Convert.DBNull;
if (vrmd.CreatedBy == null)
{
workrow["CreatedBy"] = Convert.DBNull;
}
else
{
workrow["CreatedBy"] = vrmd.CreatedBy.Name;
}
workrow["LastModifiedDate"] = vrmd.LastModifiedDate ?? Convert.DBNull;
if (vrmd.LastModifiedBy == null)
{
workrow["LastModifiedBy"] = Convert.DBNull;
}
else
{
workrow["LastModifiedBy"] = vrmd.LastModifiedBy.Name;
}
lock (CustomTabs)
{
CustomTabs.Rows.Add(workrow);
}
});
OrgTables.SendDTtoDB(ref CustomTabs, ref _cnn, "OrgCustomTabs");
我以前见过这个问题。 Guid.NewGuid()
没有问题,但是 DataTable
不是线程安全的!
DataTable is simply not designed or intended for concurrent usage (in particular where there is any form of mutation involved).
见Thread safety for DataTable
还相关:c# DataGridView DataTable internal index corrupted in parallel loop
It's done in a loop because I have to query the API over and over to get each item in the object. I'm doing the Parallel.For loop so I can speed up the process. If I need to get 450 items or more one at a time, I'd like to multithread that for speed purposes. The database access isn't being done in the loop, just building the datatable is because once I get the data back from the API I need to store it.
您可以创建一个类型并将它们以多线程方式添加到 ConcurrentBag<T>
或 ConcurrentQueue<T>
(或另一个并发集合,请参阅 MS Docs)——它们是线程安全的 :)
然后您可以使用单线程构建 DataTable
。或者,如果您的用例可能的话,可以跳过整个数据表。
问题在于,必须在 DataTable 的 Parallel.ForEach
中使用 lock
,这首先违背了使用 Parallel.ForEach
的目的;但是,令我惊讶的是,当您调用 DataRow workrow = CustomTabs.NewRow();
时您没有得到异常,因为在我的测试中,我得到了一个索引损坏的异常。实际上,我不得不将对 NewRow
的调用包装在一个锁中。像这样:
Parallel.ForEach(data, x =>
{
DataRow row = null;
lock (lockRow)
{
row = dt.NewRow();
row["Guid"] = Guid.NewGuid();
}
...
lock(lockObj)
dt.Rows.Add(row);
其中 lockObj
和 lockRow
是实例化为
static object lockObj = new object();
static object lockRow = new object();
这对我有用,将 100 万行添加到 DataTable
并确保所有 Guid 都是唯一的。
综上所述,我强烈建议按照 Julian 的建议编写代码或创建一个实现 IDataReader
的 class(您可以将其与 SQLBulkCopy 一起使用)并使用上传数据那。
我使用了每次访问时都锁定的 guidgenerator。
lock (guidGenerator)
{
entity.Id = guidGenerator.NewGuid();
}
对我来说效果很好。不过这是一种不同的方法。